import os
import subprocess
import shlex
import random
import string
import tempfile
from copy import copy
from datetime import datetime
from glob import glob
from hashlib import sha256
from pype.__config__ import PYPE_TMP, PYPE_DOCKER, PYPE_SINGULARITY_CACHE
from pype.exceptions import EnvModulesError, CommandNamespaceError
def get_module_cmd():
try:
modules_home = os.environ.get('MODULESHOME')
if modules_home:
modules_path = os.path.join(modules_home, 'init/python.py')
if os.path.exists(modules_path):
modules = {}
with open(modules_path, 'rb') as env_modules:
exec(env_modules.read(), modules)
try:
return modules['module']
except KeyError as e:
raise EnvModulesError(e)
else:
raise EnvModulesError("No python script %s" % modules_path)
else:
raise EnvModulesError(
"No MODULESHOME variable found in the environment")
except EnvModulesError as err:
raise err
def module_avail(module=None, modulepath=None):
modulepath_cmd = 'module use %s' % modulepath
command = 'module avail'
if module:
command = '%s %s' % (command, module)
if modulepath is not None:
modulepath_cmd = 'module use %s' % modulepath
command = '%s && %s' % (modulepath_cmd, command)
call_proc = subprocess.Popen(command, shell=True,
stderr=subprocess.PIPE)
while True:
try:
yield next(call_proc.stderr).decode('utf-8')
except StopIteration:
break
def program_string(program_dict):
try:
return '%s/%s' % (
program_dict['path'], program_dict['version'])
except KeyError:
return '%s/%s' % (
program_dict['namespace'], program_dict['version'])
def is_in_directory(x, y):
path_b = os.path.join(
os.path.realpath(y), '')
path_a = os.path.realpath(x)
return os.path.commonprefix(
[path_a, path_b]) == path_b
def has_same_basedir(x, y):
path_a = os.path.dirname(
os.path.realpath(x))
path_b = os.path.dirname(
os.path.realpath(y))
return path_a == path_b
[docs]class Namespace:
"""
A mechanism to load different environments
Define a basic abstraction layer to load programs and environments
to the :class:`Command` class
"""
def __init__(self, program_dict, log, profile):
"""
[summary]
[extended_summary]
:param program_dict: A dictionary with the following keys
`namespace`, `version`, `dependencies`.
`namespace` is a string composed by the the namespace type
and the namespace item, separated by the `@` character.
The supported namespace types are `docker`, `env_modules` and
`path`. the namespace item is a string relevant to the namespace
type (eg. the docker container repository url).
the `version` is a string defining the tag/version of the docker
container or the version of the program to load (again, depending
on the namespace type selected).
`dependencies` is a key only used for the `env_modules` namespace
and is used to load other environment modules to satisfy the
loading dependencies.
:type program_dict: dict
:param log: Log object to append logging in the snippet log file
:type log: :class:`pype.logger.PypeLogger`
:param profile: Profile object
:type profile: :class:`pype.utils.profiles.Profile`
:raises SnippetNamespaceError: `Wrong Namespace format` if the
namespace does have more then `@` characters.
:raises SnippetNamespaceError: `Not supported namespace` if the
`namespace type` is not `docker`, `env_modules` or `path`.
:raises SnippetNamespaceError: `All dependencies must be type
env_module` if some of the dependencies defined in the
`dependencies` key is not a namespace of the
`env_modules` type.
"""
self.type = None
self.list = []
self.log = log
namespace_list = program_dict['namespace'].split('@')
if len(namespace_list) == 2:
self.type = namespace_list[0]
self.log.info('Set namespace to %s' % self.type)
namespace_item = namespace_list[1]
elif len(namespace_list) == 1:
self.type = 'path'
self.log.info('Set namespace to %s' % self.type)
namespace_item = namespace_list[0]
else:
self.log.error('Wrong Namespace format')
raise CommandNamespaceError('Wrong Namespace format')
supported_namespaces = (
'path',
'env_module',
'docker'
)
if self.type not in supported_namespaces:
self.log.error(
'Not supported namespace names %s' % self.type)
raise CommandNamespaceError(
'Not supported namespace %s' % self.type)
if self.type == 'env_module':
try:
deps_list = program_dict['dependencies']
except KeyError:
deps_list = []
for dep in deps_list:
dep_dict = profile.programs[dep]
dep_nm = Namespace(dep_dict, self.log, profile)
if dep_nm.type != 'env_module':
self.log.error(
'All dependencies must be type env_module')
raise CommandNamespaceError(
'All dependencies must be type env_module')
self.list += dep_nm.list
self.list.append(
program_string(
{
'namespace': namespace_item,
'version': program_dict['version']
}
))
elif self.type == 'docker':
docker_cmd = PYPE_DOCKER
docker_runtime = os.path.basename(docker_cmd)
if docker_runtime == 'singularity':
singularity_cache = PYPE_SINGULARITY_CACHE
sif_file = '%s_%s.sif' % (
namespace_item,
program_dict['version'])
nm = os.path.join(singularity_cache, sif_file)
else:
nm = '%s:%s' % (
namespace_item,
program_dict['version']
)
self.list.append(nm)
elif self.type == 'path':
self.list.append(namespace_item)
def first(self):
return self.list[0]
[docs]class Volume:
"""
Volume class to abstract and parametrize the binding of files
while running commands in containerized environments.
The class contains also method to adjust the bind volume argument
to implementation such as `udocker` and `singularity`.
"""
def __init__(self, path, output=False, bind_prefix='/var/lib/pype'):
"""
Init the class defining the path in the host environment, the prefix
in the container environment and flagging if the path is a input or
an output target
:param path: File or directory to bind in the host system
:type path: str
:param output: Set to `True` if `path` is an output target,
defaults to `False`.
:type output: bool, optional
:param bind_prefix: Prefix path in the container environment,
defaults to '/var/lib/pype'.
:type bind_prefix: str, optional
"""
self.mode = 'ro'
self.volume_str = '--volume=%(host_file)s:%(docker_file)s:%(mode)s'
self.path = path
self.to_bind = path
now = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
uid = '%s%s' % (now, path)
random_str = sha256(uid.encode('utf-8')).hexdigest()
self.bind_volume = os.path.join(
bind_prefix, random_str, os.path.basename(self.to_bind))
if output is True:
self.mode = 'rw'
self.to_bind = os.path.dirname(path)
self.bind_volume = os.path.join(
bind_prefix, random_str, os.path.basename(self.to_bind))
self.bind_file = os.path.join(
self.bind_volume, os.path.basename(self.path))
else:
self.bind_file = self.bind_volume
[docs] def remove_mode(self):
"""
Removes the trainling mode (eg the ending `:rw`) from
the bining string.
"""
self.mode = ''
self.volume_str = '--volume=%(host_file)s:%(docker_file)s'
[docs] def singularity_volume(self):
"""
Format the volume binding string following the `singularity`
command line syntax.
"""
self.volume_str = '--bind %(host_file)s:%(docker_file)s:%(mode)s'
[docs] def replace_bind_volume(self, bind_path):
"""
Replaces the bind volume in the container environment
with the specified `bind path`.
This is useful to manage binding point to multiple
paths (defined in multiple `Volume` classes) that
are subfolders of another bind volume in the host system.
:param bind_path: Binding point to replace instead of the
current one randomly generated by the class.
:type bind_path: str
"""
bind_file_i = os.path.join(
bind_path,
os.path.basename(self.bind_file))
self.bind_file = bind_file_i
self.bind_volume = bind_file_i
[docs] def replace_bind_dirname(self, bind_path):
"""
Replaces the bind volume in the container environment
with the `dirname` of the specified `bind path`.
This is useful to give the same binding point to multiple
paths (defined in multiple `Volume` classes) that
are in the same folder in the host system.
:param bind_path: Binding point to replace instead of the
current one randomly generated by the class.
:type bind_path: str
"""
bind_file_i = os.path.join(
os.path.dirname(bind_path),
os.path.basename(self.bind_file))
self.bind_file = bind_file_i
self.bind_volume = bind_file_i
[docs] def to_str(self):
"""
Returns a string with the bind volume argument relative
to the content of the class
:return: Bind volume string
:rtype: str
"""
return self.volume_str % {
'host_file': self.to_bind,
'docker_file': self.bind_volume,
'mode': self.mode
}
[docs]class Command:
"""
High level class to use :class:`subprocess.Popen` combined with
:class:`Volume` and :class:`Namespace` classes.
The :class:`Command` class is a wrapper around :class:`subprocess.Popen`
that results in a more succinct code, increasing the readability of
the command lines that are going to be executed rather then the
:class:`subprocess.Popen` boilerplate.
"""
def __init__(self, cmd, log, profile, name=''):
"""
The class initialization requires the command line string,
a :class:`Profile` class and a log object (eg the `snippet`
log object).
:param cmd: Command line string
:type cmd: str
:param log: Log class of the running snippet
:type log: :class:`pype.logger.PypeLogger`
:param profile: A Profile object
:type profile: :class:`pype.utils.profiles.Profile`
:param name: String used to identify the process in the log,
defaults to ''
:type name: str, optional
"""
self.cmd = shlex.split(cmd)
self.name = name
self.profile = profile
self.stdin = subprocess.PIPE
self.stdout = subprocess.PIPE
self.procin = None
self.docker_exec = PYPE_DOCKER
self.docker_runtime = os.path.basename(self.docker_exec)
self.inputs = []
self.outputs = []
self.volumes = []
self.uid = os.geteuid
self.gid = os.getegid
self.log = log.log
self.random_dir = '/' + ''.join(
[random.choice(string.ascii_letters) for _ in range(6)])
self.tmp = PYPE_TMP
if self.name == '':
self.log.warning('Proceeding with unnamed Command')
[docs] def add_output(self, out_file):
"""
[summary]
[extended_summary]
:param out_file: [description]
:type out_file: [type]
"""
if out_file not in self.outputs:
self.outputs.append(out_file)
[docs] def add_volume(self, path, output=False):
"""
[summary]
[extended_summary]
:param path: [description]
:type path: [type]
:param output: [description], defaults to False
:type output: bool, optional
"""
is_singularity = self.docker_runtime == 'singularity'
n_volumes = len(self.volumes)
volume = Volume(path, output=output)
for i in range(n_volumes):
path_i = self.volumes[i].to_bind
mode_i = self.volumes[i].mode
has_same_mode = mode_i == volume.mode
if is_in_directory(path_i, path) and has_same_mode:
bind_rel_path = os.path.join(
volume.bind_volume, os.path.dirname(
os.path.relpath(path_i, path)))
if not is_singularity:
self.volumes[i].replace_bind_volume(
bind_rel_path)
elif is_in_directory(path, path_i) and has_same_mode:
bind_rel_path = os.path.join(
self.volumes[i].bind_volume, os.path.dirname(
os.path.relpath(path, path_i)))
if not is_singularity:
volume.replace_bind_volume(
bind_rel_path)
elif has_same_basedir(path, path_i) and has_same_mode:
volume.replace_bind_dirname(
self.volumes[i].bind_volume)
self.volumes.append(volume)
def normalize_volumes(self):
_unique_volumes_list_ = []
_to_bind_ = []
_bind_volumes_ = []
for volume in self.volumes:
if volume.bind_volume not in _bind_volumes_ \
and volume.to_bind not in _to_bind_:
_unique_volumes_list_.append(volume)
_to_bind_.append(volume.to_bind)
_bind_volumes_.append(volume.bind_volume)
self.volumes = _unique_volumes_list_
[docs] def add_namespace(self, namespace):
"""
[summary]
[extended_summary]
"""
self.namespace = Namespace(namespace, self.log, self.profile)
[docs] def docker(self, local_script):
"""
[summary]
[extended_summary]
:raises Exception: [description]
"""
docker_cwd = tempfile.mkdtemp(dir=self.tmp)
docker_data = {
'user': self.uid(),
'group': self.gid(),
'random_dir': self.random_dir,
'docker': self.docker_exec,
'cwd': docker_cwd,
}
cmd = copy(self.cmd)
if self.docker_runtime == 'udocker':
docker_cmd = (
'%(docker)s --quiet run -i --rm '
'--user=%(user)i:%(group)i '
'--workdir=/var/spool/pype '
'--volume=%(cwd)s:/var/spool/pype') % docker_data
elif self.docker_runtime == 'singularity':
docker_cmd = (
'%(docker)s --quiet exec '
'--contain --pid --ipc '
'--pwd %(random_dir)s '
'--home %(cwd)s:%(random_dir)s') % docker_data
else:
docker_cmd = (
'%(docker)s run -i --rm '
'--user=%(user)i:%(group)i '
'--workdir=/var/spool/pype '
'--volume=%(cwd)s:/var/spool/pype:rw') % docker_data
for in_file in self.inputs:
self.add_volume(in_file)
for out_file in self.outputs:
self.add_volume(out_file, output=True)
volumes_files = []
if local_script is True:
exec_file = self.cmd[0]
self.add_volume(exec_file)
self.replace_values_in_code(
exec_file)
# TODO, check the content of exec_file and rewrite it
# replacing input/output file to bind volumes paths
# As it's done now for the command line
# NOTE: rewriting the code_file of CodeChunk is not elegant, but
# it's 1st solution that came to mind without restructuring
# the current structure of Command and CodeChunk
# potentially, we could wrap CodeChunk inside Command, so in case
# of volume binding in containers it will deal with changing the
# argument and write the "right" code from the start
self.normalize_volumes()
for volume in self.volumes:
for i in range(len(cmd)):
if volume.path == self.cmd[i]:
cmd[i] = volume.bind_file
if self.docker_runtime == 'udocker':
volume.remove_mode()
elif self.docker_runtime == 'singularity':
volume.singularity_volume()
volumes_files.append(volume.to_str())
try:
docker_image = self.namespace.first()
except AttributeError:
self.log.error('A namespace is required before '
'executing a Docker command')
raise Exception('A namespace is required before '
'executing a Docker command')
volumes_files = ' '.join(volumes_files)
docker_cmd = '%s %s' % (docker_cmd, volumes_files)
docker_cmd = '%s %s' % (docker_cmd, docker_image)
docker_cmd = shlex.split(docker_cmd)
docker_cmd += cmd
self.log.info('Prepare Docker command %s' % ' '.join(docker_cmd))
self.log.info('Replace %s with Docker command' % ' '.join(self.cmd))
self.cmd = docker_cmd
[docs] def run(self, local_script=False):
"""
[summary]
[extended_summary]
:param local_script: [description], defaults to False
:type local_script: bool, optional
"""
try:
namespace = self.namespace.type
except AttributeError:
self.log.warning('A namespace is required before '
'executing a command, continuing '
'with namespace="path"')
namespace = 'path'
if namespace == 'docker':
self.docker(local_script)
elif namespace == 'env_module':
module = get_module_cmd()
for nm in self.namespace.list:
self.log.info('Add env_module %s' % nm)
module('add', nm)
try:
# Attempt to close the pipe if
# the process was piped in earlier:
# Ensure comunication of broken pipes
self.procin.child_close()
except AttributeError:
pass
self.log.info('Prepare %s command line' % self.name)
self.log.info(' '.join(map(str, self.cmd)))
self.log.info(
'Execute %s with python subprocess.Popen' % self.name)
self.proc = subprocess.Popen(
self.cmd, stdin=self.stdin, stdout=self.stdout)
self.stdout = self.proc.stdout
[docs] def pipe_in(self, command, local_script=False):
"""
[summary]
[extended_summary]
:param command: [description]
:type command: [type]
:param local_script: [description], defaults to False
:type local_script: bool, optional
"""
self.log.info('Pipe in %s in %s command' % (
command.name, self.name))
self.procin = command
self.procin.run(local_script)
self.stdin = self.procin.stdout
[docs] def child_close(self):
"""
[summary]
[extended_summary]
"""
try:
self.procin.stdout.close()
self.log.info(
'Close %s stdout stream' % self.procin.name)
except AttributeError:
pass
[docs] def close(self):
"""
[summary]
[extended_summary]
:return: [description]
:rtype: [type]
"""
if self.procin is not None:
self.child_close()
res = self.proc.communicate()
self.log.info('Terminate %s' % self.name)
return res
[docs] def replace_values_in_code(self, code_file):
"""
[summary]
[extended_summary]
:param code_file: [description]
:type code_file: [type]
"""
code_lines = ''
with open(code_file, 'rt') as code:
for line in code:
for volume in self.volumes:
line = line.replace(volume.path, volume.bind_file)
code_lines += line
with open(code_file, 'wt') as code:
code.write(code_lines)
os.chmod(code_file, 0o760)