import os
import shlex
import subprocess
import tempfile
from copy import copy
from dataclasses import dataclass
from glob import glob
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
from pype.__config__ import (
PYPE_CONDA,
PYPE_DOCKER,
PYPE_OVERLAY_MODE,
PYPE_OVERLAY_SCRATCH,
PYPE_SINGULARITY_CACHE,
PYPE_TMP,
)
from pype.exceptions import CommandError, CommandNamespaceError, EnvModulesError
from pype.misc import generate_uid
from pype.utils import overlay
from pype.utils.containers import get_singularity_image_path, parse_container_reference
[docs]
@dataclass
class DockerConfig:
"""Configuration for Docker and container runtime environments.
This class manages container runtime configuration and provides properties
to identify the runtime type (Docker, Singularity, uDocker).
Attributes:
exec_path: Path to the container runtime executable
runtime: Name of the container runtime ('docker', 'singularity', 'udocker')
cache_dir: Optional cache directory, required for Singularity containers
"""
exec_path: Path
runtime: str
cache_dir: Optional[Path] = None
@property
def is_singularity(self) -> bool:
"""Check if runtime is Singularity."""
return self.runtime == "singularity"
@property
def is_udocker(self) -> bool:
"""Check if runtime is uDocker."""
return self.runtime == "udocker"
[docs]
def get_module_cmd():
"""Get the environment modules command interface."""
try:
modules_home = os.environ.get("MODULESHOME")
if not modules_home:
raise EnvModulesError("No MODULESHOME variable found in the environment")
modules_path = Path(modules_home) / "init/python.py"
if not modules_path.exists():
raise EnvModulesError(f"No python script {modules_path}")
modules = {}
exec(modules_path.read_bytes(), modules)
return modules["module"]
except KeyError as e:
raise EnvModulesError(f"Module command not found: {e}")
[docs]
def program_string(program_dict: Dict[str, str]) -> str:
"""Format program string from dictionary."""
try:
return f"{program_dict['path']}/{program_dict['version']}"
except KeyError:
return f"{program_dict['namespace']}/{program_dict['version']}"
[docs]
@dataclass
class OverlayVolume:
"""Overlay path mapping for an output registered via add_output().
Mirrors the role of Volume for docker (path → bind_file substitution and
harvest tracking), but lives in self.overlay_volumes rather than
self.volumes so it never interferes with container-specific logic.
Attributes:
path: original declared output path
original_dir: parent dir of path (same root-level logic as Volume.to_bind)
scratch_dir: scratch dir that maps to original_dir (harvest source)
bind_file: scratch equivalent of path (substitution target in scripts/cmd)
"""
path: str
original_dir: str
scratch_dir: str
bind_file: str
harvest: bool = True
[docs]
@dataclass
class Volume:
"""Volume binding configuration for containerized environments.
Handles the mounting of files and directories between host and container
environments, managing read-only and read-write access modes.
Attributes:
path: Path to the file or directory on the host system
output: If True, mount as read-write; if False, mount as read-only
bind_prefix: Base path in the container where volumes will be mounted
"""
path: str
output: bool = False
bind_prefix: str = "/var/lib/pype"
def __post_init__(self):
"""Initialize volume configuration after instance creation.
Sets up volume string templates and generates unique binding paths
to avoid conflicts between multiple volumes.
"""
self.path = str(Path(self.path))
self.volume_str = "--volume=%(host_file)s:%(docker_file)s:%(mode)s"
self.mode = "rw" if self.output else "ro"
# For outputs, bind the parent directory, but handle top-level directories specially
if self.output:
parent = os.path.dirname(self.path)
# If parent is root (/), bind the directory itself instead
if parent in ["/", ""]:
self.to_bind = self.path
else:
self.to_bind = parent
else:
self.to_bind = self.path
# Generate random subfolder for binding
random_str = generate_uid(12, False)
bind_base = Path(self.bind_prefix) / random_str / Path(self.to_bind).name
self.bind_volume = str(bind_base)
# When to_bind == path (root-level dir special case), the dir itself is
# the mount point so bind_file == bind_volume; otherwise the file/dir
# sits one level inside the bound parent directory.
if self.output and self.to_bind != self.path:
self.bind_file = str(bind_base / Path(self.path).name)
else:
self.bind_file = self.bind_volume
[docs]
def remove_mode(self) -> None:
"""Remove mode specifier for udocker compatibility."""
self.mode = ""
self.volume_str = "--volume=%(host_file)s:%(docker_file)s"
[docs]
def singularity_volume(self) -> None:
"""Format for Singularity binding syntax."""
self.volume_str = "--bind %(host_file)s:%(docker_file)s:%(mode)s"
[docs]
def replace_bind_volume(self, bind_path):
"""
Replaces the bind volume in the container environment
with the specified `bind path`.
This is useful to manage binding point to multiple
paths (defined in multiple `Volume` classes) that
are subfolders of another bind volume in the host system.
:param bind_path: Binding point to replace instead of the
current one randomly generated by the class.
:type bind_path: str
"""
bind_file_i = os.path.join(bind_path, os.path.basename(self.bind_file))
self.bind_file = bind_file_i
if self.mode == "rw":
self.bind_volume = os.path.dirname(bind_file_i)
else:
self.bind_volume = bind_file_i
[docs]
def replace_bind_dirname(self, bind_path):
"""
Replaces the bind volume in the container environment
with the `dirname` of the specified `bind path`.
This is useful to give the same binding point to multiple
paths (defined in multiple `Volume` classes) that
are in the same folder in the host system.
:param bind_path: Binding point to replace instead of the
current one randomly generated by the class.
:type bind_path: str
"""
bind_file_i = os.path.join(
os.path.dirname(bind_path), os.path.basename(self.bind_file)
)
self.bind_file = bind_file_i
if self.mode == "rw":
self.bind_volume = os.path.dirname(bind_file_i)
else:
self.bind_volume = bind_file_i
[docs]
def to_str(self) -> str:
"""Get formatted volume string."""
return self.volume_str % {
"host_file": self.to_bind,
"docker_file": self.bind_volume,
"mode": self.mode,
}
[docs]
class Command:
"""High-level interface for executing commands with container support.
This class provides a unified interface for running commands locally or
within containers (Docker, Singularity, uDocker). It handles:
- Command execution and piping
- Volume mounting and file access
- Container runtime configuration
- Environment modules integration
- Input/output file tracking
Attributes:
cmd: Command to execute as list of arguments
name: Identifier for the command (used in logs)
profile: Configuration profile for the environment
log: Logger instance for command execution tracking
docker: Container runtime configuration
inputs: List of input files required by the command
outputs: List of output files produced by the command
volumes: List of volumes to mount in container environments
"""
[docs]
def __init__(self, cmd: str, log: Any, profile: Any, name: str = ""):
"""Initialize command execution environment.
Args:
cmd: Command string to execute
log: Logger instance for command output
profile: Environment profile configuration
name: Optional identifier for the command
"""
self.cmd = shlex.split(cmd)
self.name = name
self.profile = profile
self.log = log.log
# Initialize container configuration
self.docker = DockerConfig(
exec_path=Path(PYPE_DOCKER),
runtime=Path(PYPE_DOCKER).name,
cache_dir=(
Path(PYPE_SINGULARITY_CACHE)
if Path(PYPE_DOCKER).name == "singularity"
else None
),
)
# Process configuration
self.stdin = subprocess.PIPE
self.stdout = subprocess.PIPE
self.procin: Optional[Command] = None
# File tracking
self.inputs: List[str] = []
self.outputs: List[str] = []
self._output_harvest: Dict[str, bool] = {}
self.volumes: List[Volume] = []
self.overlay_volumes: List[OverlayVolume] = []
# Runtime configuration
self.uid = os.geteuid
self.gid = os.getegid
self.random_work_dir = Path("/") / generate_uid(6, False)
self.tmp = Path(PYPE_TMP)
if not name:
self.log.warning("Proceeding with unnamed Command")
[docs]
def add_output(self, out_file: str, harvest: bool = True) -> None:
"""Add output file to track."""
if out_file not in self.outputs:
self.outputs.append(out_file)
self._output_harvest[out_file] = harvest
[docs]
def add_volume(self, path: str, output: bool = False) -> None:
"""Add volume to track."""
n_volumes = len(self.volumes)
volume = Volume(path, output=output)
for existing in self.volumes:
if existing.path == volume.path:
if volume.mode == "rw" and existing.mode == "ro":
existing.mode = "rw"
return
# If an overlay volume covers this output's parent dir, redirect the
# docker host-side path to scratch so container writes land there.
if output and self.overlay_volumes:
ov = next(
(ov for ov in self.overlay_volumes if ov.original_dir == volume.to_bind),
None,
)
if ov is not None:
random_str = generate_uid(12, False)
bind_base = (
Path(volume.bind_prefix) / random_str / Path(ov.scratch_dir).name
)
volume.to_bind = ov.scratch_dir
volume.bind_volume = str(bind_base)
volume.bind_file = (
str(bind_base / Path(path).name)
if ov.original_dir != str(Path(path))
else str(bind_base)
)
new_to_bind = volume.to_bind # effective mount root (parent dir for outputs)
for i in range(n_volumes):
path_i = self.volumes[i].to_bind
mode_i = self.volumes[i].mode
has_same_mode = mode_i == volume.mode
if is_direct_child_of(path_i, new_to_bind) and has_same_mode:
self.volumes[i].replace_bind_volume(volume.bind_volume)
elif is_direct_child_of(path, path_i) and has_same_mode:
volume.replace_bind_volume(self.volumes[i].bind_volume)
elif has_same_basedir(path, path_i) and has_same_mode:
volume.replace_bind_dirname(self.volumes[i].bind_volume)
self.volumes.append(volume)
[docs]
def normalize_volumes(self) -> list:
"""Normalize volumes to avoid duplicates for bind mounts.
Returns:
List of unique volumes for bind mounts (deduplicated).
Note: self.volumes is kept intact for path substitution.
"""
_unique_volumes_list_ = []
_to_bind_ = []
_bind_volumes_ = []
for volume in self.volumes:
# Keep volume only if BOTH source path AND container path are unique
if (
volume.bind_volume not in _bind_volumes_
and volume.to_bind not in _to_bind_
):
_unique_volumes_list_.append(volume)
_to_bind_.append(volume.to_bind)
_bind_volumes_.append(volume.bind_volume)
return _unique_volumes_list_
[docs]
def add_namespace(self, namespace: Union[Dict[str, Any], Any]) -> None:
"""Add namespace to command.
Args:
namespace: Program configuration as dict or ProfileProgram object
"""
self.namespace = Namespace(namespace, self.log, self.profile)
def _spawn(self, capture_stderr: bool = False) -> None:
"""Bare Popen on self.cmd — no overlay logic, no logging."""
stderr_pipe = subprocess.PIPE if capture_stderr else None
self.proc = subprocess.Popen(
self.cmd, stdin=self.stdin, stdout=self.stdout, stderr=stderr_pipe
)
self.stdout = self.proc.stdout
def _setup_overlay_volumes(self) -> None:
"""Populate self.overlay_volumes from self.outputs.
One scratch dir is created per unique output parent dir (same root-level
logic as Volume.to_bind). Each declared output gets an OverlayVolume
that maps original path → scratch equivalent.
For local/conda runs replace_values_in_code and the cmd loop in local_run
use these entries directly. For docker runs, add_volume detects them and
redirects the docker Volume's host-side path to scratch transparently.
"""
input_dirs: set = set()
for inp in self.inputs:
p = Path(inp)
parent = str(p.parent)
input_dirs.add(str(p) if parent in ("/", "") else parent)
dir_to_scratch: Dict[str, str] = {}
for out in self.outputs:
p = Path(out)
parent = str(p.parent)
odir = str(p) if parent in ("/", "") else parent
odir_path = Path(odir)
if any(
Path(idir) == odir_path
or Path(idir).is_relative_to(odir_path)
or odir_path.is_relative_to(idir)
for idir in input_dirs
):
continue # shared read/write dir: skip overlay, let tool use real path
if odir not in dir_to_scratch:
dir_to_scratch[odir] = None # placeholder
if not dir_to_scratch:
return
scratch_base = Path(PYPE_OVERLAY_SCRATCH) / generate_uid(12, False)
for i, odir in enumerate(dir_to_scratch):
scratch = scratch_base / f"upper_{i}"
scratch.mkdir(parents=True, exist_ok=True)
dir_to_scratch[odir] = str(scratch)
for out in self.outputs:
p = Path(out)
parent = str(p.parent)
odir = str(p) if parent in ("/", "") else parent
if odir not in dir_to_scratch:
continue # skipped due to input/output overlap
scratch_dir = dir_to_scratch[odir]
rel = out[len(odir):]
bind_file = scratch_dir + rel if rel else scratch_dir
if rel:
out_path = Path(out)
if out_path.is_dir() or not out_path.suffix:
Path(bind_file).mkdir(parents=True, exist_ok=True)
self.overlay_volumes.append(
OverlayVolume(
path=out,
original_dir=odir,
scratch_dir=scratch_dir,
bind_file=bind_file,
harvest=self._output_harvest.get(out, True),
)
)
self._overlay_scratch = str(scratch_base)
self.log.info(f"Overlay: {len(dir_to_scratch)} output dir(s) redirected to scratch")
[docs]
def run(self, local_script: bool = False, capture_stderr: bool = False) -> None:
"""Execute the command.
Args:
local_script: If True, execute as local script
capture_stderr: If True, capture stderr output (default False to maintain current behavior)
"""
if PYPE_OVERLAY_MODE == "overlay" and self.outputs:
self._setup_overlay_volumes()
if hasattr(self, "namespace") and self.namespace.type == "docker":
self.docker_run(local_script)
elif hasattr(self, "namespace") and self.namespace.type == "env_module":
self.module_run(local_script=local_script, capture_stderr=capture_stderr)
elif hasattr(self, "namespace") and self.namespace.type == "conda":
self.conda_run(local_script=local_script, capture_stderr=capture_stderr)
else:
self.local_run(local_script=local_script, capture_stderr=capture_stderr)
[docs]
def docker_run(self, local_script: bool) -> None:
"""Execute command within a container environment.
Handles the complexities of running commands in containers including:
- Volume mounting and path translation
- User permissions
- Working directory setup
- Container-specific command modifications
Args:
local_script: If True, indicates the command is a local script that
needs to be mounted in the container
"""
docker_cwd = tempfile.mkdtemp(dir=self.tmp)
docker_data = {
"user": self.uid(),
"group": self.gid(),
"random_dir": self.random_work_dir,
"docker": self.docker.exec_path,
"cwd": docker_cwd,
}
cmd = copy(self.cmd)
# Handle container command based on runtime
if self.docker.is_udocker:
docker_cmd = (
f"{docker_data['docker']} --quiet run -i --rm "
f"{self.namespace.docker_extra_args} "
f"--user={docker_data['user']}:{docker_data['group']} "
f"--workdir=/var/spool/pype "
f"--volume={docker_data['cwd']}:/var/spool/pype"
)
elif self.docker.is_singularity:
docker_cmd = (
f"{docker_data['docker']} --quiet exec "
f"{self.namespace.docker_extra_args} --contain "
f"--pid --ipc --pwd {docker_data['random_dir']} "
f"--home {docker_data['cwd']}:{docker_data['random_dir']}"
)
else:
docker_cmd = (
f"{docker_data['docker']} run -i --rm "
f"--entrypoint \"\" "
f"{self.namespace.docker_extra_args} "
f"--user={docker_data['user']}:{docker_data['group']} "
f"--workdir=/var/spool/pype "
f"--volume={docker_data['cwd']}:/var/spool/pype:rw"
)
# Handle volumes
for in_file in self.inputs:
self.add_volume(in_file)
for out_file in self.outputs:
self.add_volume(out_file, output=True)
# Always add PYPE_TMP as it's equivalent to TMPDIR
# a common pattern in various tools
self.add_volume(str(self.tmp), output=True)
volumes_files = []
if local_script:
exec_file = self.cmd[-1]
self.add_volume(exec_file)
self.replace_values_in_code(exec_file)
# First, do path substitution using ALL volumes (before deduplication)
for volume in self.volumes:
# Update command arguments if they match volume paths
for i in range(len(cmd)):
if self.cmd[i] == volume.path or self.cmd[i] == volume.to_bind:
cmd[i] = volume.bind_file
# Then get deduplicated volumes for bind mounts
unique_volumes = self.normalize_volumes()
for volume in unique_volumes:
# Adjust volume string based on container runtime
if self.docker.is_udocker:
volume.remove_mode()
elif self.docker.is_singularity:
volume.singularity_volume()
volumes_files.append(volume.to_str())
try:
docker_image = self.namespace.last()
except AttributeError:
self.log.error("A namespace is required before executing a Docker command")
raise Exception("A namespace is required before executing a Docker command")
# Construct final docker command
volumes_files = " ".join(volumes_files)
docker_cmd = f"{docker_cmd} {volumes_files}"
docker_cmd = f"{docker_cmd} {docker_image}"
docker_cmd = shlex.split(docker_cmd)
docker_cmd.extend(cmd)
self.log.info(f"Prepare Docker command {' '.join(docker_cmd)}")
self.log.info(f"Replace {' '.join(self.cmd)} with Docker command")
self.cmd = docker_cmd
self._spawn()
[docs]
def local_run(self, local_script: bool = False, capture_stderr: bool = False) -> None:
"""Execute command locally.
Args:
local_script: If True, the last element of self.cmd is a script file
whose content is rewritten for overlay path substitution.
capture_stderr: If True, capture stderr output (default False to maintain current behavior)
"""
if self.overlay_volumes:
if local_script:
self.replace_values_in_code(self.cmd[-1])
for ov in self.overlay_volumes:
for i in range(len(self.cmd)):
if self.cmd[i] == ov.path:
self.cmd[i] = ov.bind_file
elif self.cmd[i] == ov.original_dir:
self.cmd[i] = ov.scratch_dir
self.log.info(f"Prepare {self.name} command line")
self.log.info(" ".join(map(str, self.cmd)))
self.log.info(f"Execute {self.name} with python subprocess.Popen")
self._spawn(capture_stderr)
[docs]
def module_load(self) -> None:
"""
load env_modules dependencies
"""
if len(self.namespace.list) > 0:
module = get_module_cmd()
self.log.info(
f"Purge the module env before loading {self.namespace.namespace}"
)
module("purge")
for modulepath in self.namespace.modulepaths:
self.log.info(f"Use modulepath {modulepath}")
module("use", modulepath)
for nm in self.namespace.list:
self.log.info(f"Add env_module {nm}")
module("add", nm)
[docs]
def module_run(self, local_script: bool = False, capture_stderr: bool = False) -> None:
"""Execute command with environment modules.
Args:
local_script: Passed through to local_run for overlay script rewriting.
capture_stderr: If True, capture stderr output
"""
self.module_load()
self.local_run(local_script=local_script, capture_stderr=capture_stderr)
[docs]
def conda_run(self, local_script: bool = False, capture_stderr: bool = False) -> None:
"""Execute command in conda environment.
Supports both name-based and path-based conda environments:
- If program.path is set: uses -p <path>/<name>
- Otherwise: uses -n <name>
Args:
local_script: Passed through to local_run for overlay script rewriting.
capture_stderr: If True, capture stderr output
"""
# Determine if path-based or name-based environment
# Access the program object to check for path field
if hasattr(self, "namespace") and hasattr(self.namespace, "program"):
program = self.namespace.program
env_path = getattr(program, "path", None)
else:
env_path = None
env_name = self.namespace.str
if env_path:
# Path-based: conda run -p <path>/<name>
full_path = os.path.join(env_path, env_name)
conda_cmd = f"{PYPE_CONDA} run -p {full_path}"
self.log.info(f"Running command in conda environment at path: {full_path}")
else:
# Name-based: conda run -n <name>
conda_cmd = f"{PYPE_CONDA} run -n {env_name}"
self.log.info(f"Running command in conda environment: {env_name}")
self.cmd = shlex.split(conda_cmd) + self.cmd
self.module_load()
self.local_run(local_script=local_script, capture_stderr=capture_stderr)
[docs]
def pipe_in(self, command: "Command", local_script: bool = False) -> None:
"""Pipe input from another command."""
self.log.info(f"Pipe in {command.name} in {self.name} command")
self.procin = command
self.procin.run(local_script)
self.stdin = self.procin.stdout
[docs]
def release_stdout(self) -> None:
"""Release stdout to terminal (for final commands in execution).
Use this for terminal commands where output should be visible to the user.
"""
self.stdout = None
[docs]
def capture_stdout(self) -> None:
"""Capture stdout for piping (for intermediate commands).
Use this to ensure stdout is captured in subprocess.PIPE for piping to another command.
"""
self.stdout = subprocess.PIPE
[docs]
def child_close(self) -> None:
"""Close child process."""
if self.procin.stdout is not None:
self.procin.stdout.close()
self.log.info(f"Close {self.procin.name} stdout stream")
[docs]
def close(self, ignore_returncode: bool = False) -> Tuple[Any, int]:
"""Close command and return result with exit code.
Args:
ignore_returncode: If False (default), raise exception on non-zero exit.
If True, ignore exit code (legacy behavior).
Returns:
Tuple of (communicate_result, returncode)
Raises:
CommandError: If returncode != 0 and ignore_returncode is False
"""
if self.procin is not None:
self.child_close()
res = self.proc.communicate()
returncode = self.proc.returncode
self.log.info(f"Terminate {self.name} (exit code: {returncode})")
try:
if self.overlay_volumes and returncode == 0:
seen_scratch: set = set()
for ov in self.overlay_volumes:
if ov.harvest and ov.scratch_dir not in seen_scratch and Path(ov.scratch_dir).exists():
self.log.info(f"Harvest overlay {ov.scratch_dir} → {ov.original_dir}")
overlay.harvest(Path(ov.scratch_dir), Path(ov.original_dir))
seen_scratch.add(ov.scratch_dir)
finally:
if hasattr(self, "_overlay_scratch"):
overlay.cleanup(Path(self._overlay_scratch))
if not ignore_returncode and returncode != 0:
self.log.error(f"{self.name} failed with exit code {returncode}")
raise CommandError(
f"Command '{self.name}' failed with exit code {returncode}",
command=" ".join(map(str, self.cmd)),
exit_code=returncode,
)
return res, returncode
[docs]
def replace_values_in_code(self, code_file: str) -> None:
"""Replace path mappings in a script file.
Processes self.volumes (docker container paths) first, then
self.overlay_volumes (scratch paths). seen_paths ensures a path
is only substituted once — for docker runs this means docker volumes
take precedence and overlay entries for the same path are skipped.
"""
code_lines = ""
with open(code_file, "rt") as code:
for line in code:
seen_paths: set = set()
for volume in self.volumes:
if volume.path not in seen_paths:
line = line.replace(volume.path, volume.bind_file)
seen_paths.add(volume.path)
for ov in self.overlay_volumes:
if ov.path not in seen_paths:
line = line.replace(ov.path, ov.bind_file)
seen_paths.add(ov.path)
code_lines += line
with open(code_file, "wt") as code:
code.write(code_lines)
os.chmod(code_file, 0o760)
[docs]
def is_direct_child_of(x: str, y: str) -> bool:
"""Check if x is a direct child of directory y."""
path_a = Path(x).resolve()
path_b = Path(y).resolve()
return path_a.parent == path_b
[docs]
def has_same_basedir(x: str, y: str) -> bool:
"""Check if x and y have the same base directory."""
path_a = Path(x).resolve().parent
path_b = Path(y).resolve().parent
return path_a == path_b
[docs]
class Namespace:
"""Environment and dependency management system.
Manages different execution environments (path-based, environment modules,
containers) and their dependencies. Supports:
- Path-based program execution
- Environment modules loading
- Container image specification
- Dependency resolution
The namespace format is 'type@item' where:
- type: One of 'path', 'env_module', or 'docker'
- item: The specific program/container/module to use
For example:
- 'docker@ubuntu:latest'
- 'env_module@gcc/9.3.0'
- 'path@/usr/local/bin/python'
"""
[docs]
def __init__(
self,
program_dict: Union[Dict[str, Any], Any],
log: Any,
profile: Any,
):
"""Initialize namespace configuration.
Parses a program configuration and sets up namespace properties for command execution.
Supports both dict and ProfileProgram object inputs.
Args:
program_dict: Program configuration as dict or ProfileProgram object with keys:
- namespace: String in format 'type@item' where type is one of:
'docker', 'env_module', or 'path'
- version: Program version (optional for path type)
- dependencies: Optional list of dependent program names
log: Logger instance for namespace operations
profile: Environment profile containing program configurations
Attributes set:
type: Namespace type ('docker', 'env_module', or 'path')
namespace: Original namespace string (e.g., 'docker@ubuntu')
str: Parsed namespace item (e.g., 'ubuntu')
version: Program version string
list: List of resolved namespace items (empty until processing)
docker_extra_args: Extra arguments for docker/singularity commands
Raises:
CommandNamespaceError: For invalid namespace format or unsupported types
"""
self.type: Optional[str] = None
self.list: List[str] = []
self.modulepaths: List[str] = []
self.docker_extra_args = ""
self.version = None
self.namespace = None
self.str = None
self.log = log
# Extract values from either ProfileProgram object or dict
# Use duck typing to avoid circular imports - check for namespace attribute
if hasattr(program_dict, "namespace") and not isinstance(program_dict, dict):
# It's a ProfileProgram object
namespace_str = program_dict.namespace
version_str = program_dict.version
deps_list = program_dict.dependencies or []
else:
# It's a dict
namespace_str = program_dict["namespace"]
version_str = program_dict["version"]
deps_list = program_dict.get("dependencies", [])
# Store reference to program object for accessing additional fields
# (like 'path' for conda environments, 'environment' for conda specs)
self.program = program_dict
self.namespace = namespace_str
self.version = version_str
namespace_list = namespace_str.split("@")
if len(namespace_list) == 2:
self.type = namespace_list[0]
self.log.info(f"Set namespace to {self.type}")
namespace_item = namespace_list[1]
elif len(namespace_list) == 1:
self.type = "path"
self.log.info(f"Set namespace to {self.type}")
namespace_item = namespace_list[0]
else:
self.log.error("Wrong Namespace format")
raise CommandNamespaceError("Wrong Namespace format")
self.str = namespace_item
supported_namespaces = ("path", "env_module", "docker", "conda")
if self.type not in supported_namespaces:
self.log.error(f"Not supported namespace names {self.type}")
raise CommandNamespaceError(f"Not supported namespace {self.type}")
for dep in deps_list:
dep_dict = profile.programs[dep]
dep_nm = Namespace(dep_dict, self.log, profile)
if dep_nm.type != "env_module":
self.log.error("All dependencies must be type env_module")
raise CommandNamespaceError(
"All dependencies must be type path or env_module"
)
self.list += dep_nm.list
self.modulepaths += dep_nm.modulepaths
if self.type == "env_module":
self.list.append(
program_string({"namespace": namespace_item, "version": version_str})
)
modulepath = getattr(self.program, "modulepath", None) or (
self.program.get("modulepath") if isinstance(self.program, dict) else None
)
if modulepath:
self.modulepaths.append(modulepath)
elif self.type == "docker":
docker_cmd = PYPE_DOCKER
docker_runtime = Path(docker_cmd).name
# Extract extra_args from either ProfileProgram object or dict
if hasattr(program_dict, "extra_args"):
# ProfileProgram object
extra_args = program_dict.extra_args
else:
# Dict
extra_args = program_dict.get("extra_args")
if extra_args:
self.docker_extra_args = extra_args
if docker_runtime == "singularity":
# Use shared container utilities for consistent path construction
# Extract version from either ProfileProgram object or dict
version = (
program_dict.version
if hasattr(program_dict, "version")
else program_dict["version"]
)
nm = get_singularity_image_path(
namespace_item,
version,
PYPE_SINGULARITY_CACHE,
)
else:
# Docker/uDocker - use standard image:tag format with full registry
# Extract version from either ProfileProgram object or dict
version = (
program_dict.version
if hasattr(program_dict, "version")
else program_dict["version"]
)
registry, image_path = parse_container_reference(namespace_item)
nm = f"{registry}/{image_path}:{version}"
self.list.append(str(nm))
elif self.type == "path":
self.list.append(namespace_item)
[docs]
def first(self) -> str:
return self.list[0]
[docs]
def last(self) -> str:
return self.list[-1]