Source code for pype.process

import os
import subprocess
import shlex
import random
import string
import tempfile
from copy import copy
from datetime import datetime
from glob import glob
from hashlib import sha256
from pype.__config__ import PYPE_TMP, PYPE_DOCKER, PYPE_SINGULARITY_CACHE
from pype.exceptions import EnvModulesError, CommandNamespaceError


def get_module_cmd():
    try:
        modules_home = os.environ.get('MODULESHOME')
        if modules_home:
            modules_path = os.path.join(modules_home, 'init/python.py')
            if os.path.exists(modules_path):
                modules = {}
                with open(modules_path, 'rb') as env_modules:
                    exec(env_modules.read(), modules)
                try:
                    return modules['module']
                except KeyError as e:
                    raise EnvModulesError(e)
            else:
                raise EnvModulesError("No python script %s" % modules_path)
        else:
            raise EnvModulesError(
                "No MODULESHOME variable found in the environment")
    except EnvModulesError as err:
        raise err


def module_avail(module=None, modulepath=None):
    modulepath_cmd = 'module use %s' % modulepath
    command = 'module avail'
    if module:
        command = '%s %s' % (command, module)
    if modulepath is not None:
        modulepath_cmd = 'module use %s' % modulepath
        command = '%s && %s' % (modulepath_cmd, command)
    call_proc = subprocess.Popen(command, shell=True,
                                 stderr=subprocess.PIPE)
    while True:
        try:
            yield next(call_proc.stderr).decode('utf-8')
        except StopIteration:
            break


def program_string(program_dict):
    try:
        return '%s/%s' % (
            program_dict['path'], program_dict['version'])
    except KeyError:
        return '%s/%s' % (
            program_dict['namespace'], program_dict['version'])


def is_in_directory(x, y):
    path_b = os.path.join(
        os.path.realpath(y), '')
    path_a = os.path.realpath(x)
    return os.path.commonprefix(
        [path_a, path_b]) == path_b


def has_same_basedir(x, y):
    path_a = os.path.dirname(
        os.path.realpath(x))
    path_b = os.path.dirname(
        os.path.realpath(y))
    return path_a == path_b


[docs]class Namespace: """ A mechanism to load different environments Define a basic abstraction layer to load programs and environments to the :class:`Command` class """ def __init__(self, program_dict, log, profile): """ [summary] [extended_summary] :param program_dict: A dictionary with the following keys `namespace`, `version`, `dependencies`. `namespace` is a string composed by the the namespace type and the namespace item, separated by the `@` character. The supported namespace types are `docker`, `env_modules` and `path`. the namespace item is a string relevant to the namespace type (eg. the docker container repository url). the `version` is a string defining the tag/version of the docker container or the version of the program to load (again, depending on the namespace type selected). `dependencies` is a key only used for the `env_modules` namespace and is used to load other environment modules to satisfy the loading dependencies. :type program_dict: dict :param log: Log object to append logging in the snippet log file :type log: :class:`pype.logger.PypeLogger` :param profile: Profile object :type profile: :class:`pype.utils.profiles.Profile` :raises SnippetNamespaceError: `Wrong Namespace format` if the namespace does have more then `@` characters. :raises SnippetNamespaceError: `Not supported namespace` if the `namespace type` is not `docker`, `env_modules` or `path`. :raises SnippetNamespaceError: `All dependencies must be type env_module` if some of the dependencies defined in the `dependencies` key is not a namespace of the `env_modules` type. """ self.type = None self.list = [] self.log = log namespace_list = program_dict['namespace'].split('@') if len(namespace_list) == 2: self.type = namespace_list[0] self.log.info('Set namespace to %s' % self.type) namespace_item = namespace_list[1] elif len(namespace_list) == 1: self.type = 'path' self.log.info('Set namespace to %s' % self.type) namespace_item = namespace_list[0] else: self.log.error('Wrong Namespace format') raise CommandNamespaceError('Wrong Namespace format') supported_namespaces = ( 'path', 'env_module', 'docker' ) if self.type not in supported_namespaces: self.log.error( 'Not supported namespace names %s' % self.type) raise CommandNamespaceError( 'Not supported namespace %s' % self.type) if self.type == 'env_module': try: deps_list = program_dict['dependencies'] except KeyError: deps_list = [] for dep in deps_list: dep_dict = profile.programs[dep] dep_nm = Namespace(dep_dict, self.log, profile) if dep_nm.type != 'env_module': self.log.error( 'All dependencies must be type env_module') raise CommandNamespaceError( 'All dependencies must be type env_module') self.list += dep_nm.list self.list.append( program_string( { 'namespace': namespace_item, 'version': program_dict['version'] } )) elif self.type == 'docker': docker_cmd = PYPE_DOCKER docker_runtime = os.path.basename(docker_cmd) if docker_runtime == 'singularity': singularity_cache = PYPE_SINGULARITY_CACHE sif_file = '%s_%s.sif' % ( namespace_item, program_dict['version']) nm = os.path.join(singularity_cache, sif_file) else: nm = '%s:%s' % ( namespace_item, program_dict['version'] ) self.list.append(nm) elif self.type == 'path': self.list.append(namespace_item) def first(self): return self.list[0]
[docs]class Volume: """ Volume class to abstract and parametrize the binding of files while running commands in containerized environments. The class contains also method to adjust the bind volume argument to implementation such as `udocker` and `singularity`. """ def __init__(self, path, output=False, bind_prefix='/var/lib/pype'): """ Init the class defining the path in the host environment, the prefix in the container environment and flagging if the path is a input or an output target :param path: File or directory to bind in the host system :type path: str :param output: Set to `True` if `path` is an output target, defaults to `False`. :type output: bool, optional :param bind_prefix: Prefix path in the container environment, defaults to '/var/lib/pype'. :type bind_prefix: str, optional """ self.mode = 'ro' self.volume_str = '--volume=%(host_file)s:%(docker_file)s:%(mode)s' self.path = path self.to_bind = path now = datetime.now().strftime("%d/%m/%Y %H:%M:%S") uid = '%s%s' % (now, path) random_str = sha256(uid.encode('utf-8')).hexdigest() self.bind_volume = os.path.join( bind_prefix, random_str, os.path.basename(self.to_bind)) if output is True: self.mode = 'rw' self.to_bind = os.path.dirname(path) self.bind_volume = os.path.join( bind_prefix, random_str, os.path.basename(self.to_bind)) self.bind_file = os.path.join( self.bind_volume, os.path.basename(self.path)) else: self.bind_file = self.bind_volume
[docs] def remove_mode(self): """ Removes the trainling mode (eg the ending `:rw`) from the bining string. """ self.mode = '' self.volume_str = '--volume=%(host_file)s:%(docker_file)s'
[docs] def singularity_volume(self): """ Format the volume binding string following the `singularity` command line syntax. """ self.volume_str = '--bind %(host_file)s:%(docker_file)s:%(mode)s'
[docs] def replace_bind_volume(self, bind_path): """ Replaces the bind volume in the container environment with the specified `bind path`. This is useful to manage binding point to multiple paths (defined in multiple `Volume` classes) that are subfolders of another bind volume in the host system. :param bind_path: Binding point to replace instead of the current one randomly generated by the class. :type bind_path: str """ bind_file_i = os.path.join( bind_path, os.path.basename(self.bind_file)) self.bind_file = bind_file_i self.bind_volume = bind_file_i
[docs] def replace_bind_dirname(self, bind_path): """ Replaces the bind volume in the container environment with the `dirname` of the specified `bind path`. This is useful to give the same binding point to multiple paths (defined in multiple `Volume` classes) that are in the same folder in the host system. :param bind_path: Binding point to replace instead of the current one randomly generated by the class. :type bind_path: str """ bind_file_i = os.path.join( os.path.dirname(bind_path), os.path.basename(self.bind_file)) self.bind_file = bind_file_i self.bind_volume = bind_file_i
[docs] def to_str(self): """ Returns a string with the bind volume argument relative to the content of the class :return: Bind volume string :rtype: str """ return self.volume_str % { 'host_file': self.to_bind, 'docker_file': self.bind_volume, 'mode': self.mode }
[docs]class Command: """ High level class to use :class:`subprocess.Popen` combined with :class:`Volume` and :class:`Namespace` classes. The :class:`Command` class is a wrapper around :class:`subprocess.Popen` that results in a more succinct code, increasing the readability of the command lines that are going to be executed rather then the :class:`subprocess.Popen` boilerplate. """ def __init__(self, cmd, log, profile, name=''): """ The class initialization requires the command line string, a :class:`Profile` class and a log object (eg the `snippet` log object). :param cmd: Command line string :type cmd: str :param log: Log class of the running snippet :type log: :class:`pype.logger.PypeLogger` :param profile: A Profile object :type profile: :class:`pype.utils.profiles.Profile` :param name: String used to identify the process in the log, defaults to '' :type name: str, optional """ self.cmd = shlex.split(cmd) self.name = name self.profile = profile self.stdin = subprocess.PIPE self.stdout = subprocess.PIPE self.procin = None self.docker_exec = PYPE_DOCKER self.docker_runtime = os.path.basename(self.docker_exec) self.inputs = [] self.outputs = [] self.volumes = [] self.uid = os.geteuid self.gid = os.getegid self.log = log.log self.random_dir = '/' + ''.join( [random.choice(string.ascii_letters) for _ in range(6)]) self.tmp = PYPE_TMP if self.name == '': self.log.warning('Proceeding with unnamed Command')
[docs] def add_output(self, out_file): """ [summary] [extended_summary] :param out_file: [description] :type out_file: [type] """ if out_file not in self.outputs: self.outputs.append(out_file)
[docs] def add_input(self, in_file, match='exact'): """ The match argument can be either exact or recursive. - exact will match only the specified file - recursive will match all the file with the same prefix of the specified file [summary] [extended_summary] :param in_file: [description] :type in_file: [type] :param match: [description], defaults to 'exact' :type match: str, optional """ if match == 'recursive': inputs = glob('%s*' % in_file) for file_in in inputs: self.add_input(file_in, 'exact') elif in_file not in self.inputs: self.inputs.append(in_file)
[docs] def add_volume(self, path, output=False): """ [summary] [extended_summary] :param path: [description] :type path: [type] :param output: [description], defaults to False :type output: bool, optional """ is_singularity = self.docker_runtime == 'singularity' n_volumes = len(self.volumes) volume = Volume(path, output=output) for i in range(n_volumes): path_i = self.volumes[i].to_bind mode_i = self.volumes[i].mode has_same_mode = mode_i == volume.mode if is_in_directory(path_i, path) and has_same_mode: bind_rel_path = os.path.join( volume.bind_volume, os.path.dirname( os.path.relpath(path_i, path))) if not is_singularity: self.volumes[i].replace_bind_volume( bind_rel_path) elif is_in_directory(path, path_i) and has_same_mode: bind_rel_path = os.path.join( self.volumes[i].bind_volume, os.path.dirname( os.path.relpath(path, path_i))) if not is_singularity: volume.replace_bind_volume( bind_rel_path) elif has_same_basedir(path, path_i) and has_same_mode: volume.replace_bind_dirname( self.volumes[i].bind_volume) self.volumes.append(volume)
def normalize_volumes(self): _unique_volumes_list_ = [] _to_bind_ = [] _bind_volumes_ = [] for volume in self.volumes: if volume.bind_volume not in _bind_volumes_ \ and volume.to_bind not in _to_bind_: _unique_volumes_list_.append(volume) _to_bind_.append(volume.to_bind) _bind_volumes_.append(volume.bind_volume) self.volumes = _unique_volumes_list_
[docs] def add_namespace(self, namespace): """ [summary] [extended_summary] """ self.namespace = Namespace(namespace, self.log, self.profile)
[docs] def docker(self, local_script): """ [summary] [extended_summary] :raises Exception: [description] """ docker_cwd = tempfile.mkdtemp(dir=self.tmp) docker_data = { 'user': self.uid(), 'group': self.gid(), 'random_dir': self.random_dir, 'docker': self.docker_exec, 'cwd': docker_cwd, } cmd = copy(self.cmd) if self.docker_runtime == 'udocker': docker_cmd = ( '%(docker)s --quiet run -i --rm ' '--user=%(user)i:%(group)i ' '--workdir=/var/spool/pype ' '--volume=%(cwd)s:/var/spool/pype') % docker_data elif self.docker_runtime == 'singularity': docker_cmd = ( '%(docker)s --quiet exec ' '--contain --pid --ipc ' '--pwd %(random_dir)s ' '--home %(cwd)s:%(random_dir)s') % docker_data else: docker_cmd = ( '%(docker)s run -i --rm ' '--user=%(user)i:%(group)i ' '--workdir=/var/spool/pype ' '--volume=%(cwd)s:/var/spool/pype:rw') % docker_data for in_file in self.inputs: self.add_volume(in_file) for out_file in self.outputs: self.add_volume(out_file, output=True) volumes_files = [] if local_script is True: exec_file = self.cmd[0] self.add_volume(exec_file) self.replace_values_in_code( exec_file) # TODO, check the content of exec_file and rewrite it # replacing input/output file to bind volumes paths # As it's done now for the command line # NOTE: rewriting the code_file of CodeChunk is not elegant, but # it's 1st solution that came to mind without restructuring # the current structure of Command and CodeChunk # potentially, we could wrap CodeChunk inside Command, so in case # of volume binding in containers it will deal with changing the # argument and write the "right" code from the start self.normalize_volumes() for volume in self.volumes: for i in range(len(cmd)): if volume.path == self.cmd[i]: cmd[i] = volume.bind_file if self.docker_runtime == 'udocker': volume.remove_mode() elif self.docker_runtime == 'singularity': volume.singularity_volume() volumes_files.append(volume.to_str()) try: docker_image = self.namespace.first() except AttributeError: self.log.error('A namespace is required before ' 'executing a Docker command') raise Exception('A namespace is required before ' 'executing a Docker command') volumes_files = ' '.join(volumes_files) docker_cmd = '%s %s' % (docker_cmd, volumes_files) docker_cmd = '%s %s' % (docker_cmd, docker_image) docker_cmd = shlex.split(docker_cmd) docker_cmd += cmd self.log.info('Prepare Docker command %s' % ' '.join(docker_cmd)) self.log.info('Replace %s with Docker command' % ' '.join(self.cmd)) self.cmd = docker_cmd
[docs] def run(self, local_script=False): """ [summary] [extended_summary] :param local_script: [description], defaults to False :type local_script: bool, optional """ try: namespace = self.namespace.type except AttributeError: self.log.warning('A namespace is required before ' 'executing a command, continuing ' 'with namespace="path"') namespace = 'path' if namespace == 'docker': self.docker(local_script) elif namespace == 'env_module': module = get_module_cmd() for nm in self.namespace.list: self.log.info('Add env_module %s' % nm) module('add', nm) try: # Attempt to close the pipe if # the process was piped in earlier: # Ensure comunication of broken pipes self.procin.child_close() except AttributeError: pass self.log.info('Prepare %s command line' % self.name) self.log.info(' '.join(map(str, self.cmd))) self.log.info( 'Execute %s with python subprocess.Popen' % self.name) self.proc = subprocess.Popen( self.cmd, stdin=self.stdin, stdout=self.stdout) self.stdout = self.proc.stdout
[docs] def pipe_in(self, command, local_script=False): """ [summary] [extended_summary] :param command: [description] :type command: [type] :param local_script: [description], defaults to False :type local_script: bool, optional """ self.log.info('Pipe in %s in %s command' % ( command.name, self.name)) self.procin = command self.procin.run(local_script) self.stdin = self.procin.stdout
[docs] def child_close(self): """ [summary] [extended_summary] """ try: self.procin.stdout.close() self.log.info( 'Close %s stdout stream' % self.procin.name) except AttributeError: pass
[docs] def close(self): """ [summary] [extended_summary] :return: [description] :rtype: [type] """ if self.procin is not None: self.child_close() res = self.proc.communicate() self.log.info('Terminate %s' % self.name) return res
[docs] def replace_values_in_code(self, code_file): """ [summary] [extended_summary] :param code_file: [description] :type code_file: [type] """ code_lines = '' with open(code_file, 'rt') as code: for line in code: for volume in self.volumes: line = line.replace(volume.path, volume.bind_file) code_lines += line with open(code_file, 'wt') as code: code.write(code_lines) os.chmod(code_file, 0o760)