API

Utils

Pipelines

class pype.utils.arguments.BatchFileArgument(argument)[source]

BatchFileArgument read the arguments from a file and return the list of arguments. It is required for the execution of a batch snippet or batch pipeline.

class pype.utils.arguments.BatchListArgument(argument)[source]

BatchArgument read the arguments from a file and return the list of arguments. It is required for the execution of a batch snippet or batch pipeline.

class pype.utils.arguments.CompositeArgument(argument)[source]

A CompositeArgument retrieve the results from the results method of the specified snippet. It will not appear listed in the arguments help message so it’s value is None. In itself it contains a PipelineItemArguments object, defining the argument to pass to the results method of the snippets

class pype.utils.arguments.ConstantArgument(argument)[source]

xxxx

class pype.utils.arguments.PipelineItemArguments[source]

An object to gather the Argument of a PipelineItem.

This is meant to collect the structure and the type of the arguments defined in a pipeline yaml file.

add_argument(argument, argument_type='argv_arg')[source]

Add the appropriate Argument class to the PipelineItemArguments argument list

Parameters:
  • argument (dict) – An item from the list of arguments from the pipeline yaml file. It should contain the keys prefix an pipeline_arg. The key prefix indicate the flag usd in the snippet/pipeline to which the PipelineItem is configured to execute. The key pipeline_arg indicate the keyword or object that the pipeline engine need to interpret to convert into arguments and also to construct the command line interface and.
  • argument_type (str) – The type of argument, this parameter will select which argument class would be used to parse the argument. possible choices are composite_arg, batch_list_arg and argv_arg. Default argv_arg.
to_dict(args_dict=None)[source]

Converts the argument in the PipelineItemArguments into dictionaries simlar to argparse

Example

Queues

class pype.utils.queues.SnippetRuntime(command, log, profile)[source]

A class to help building queue modules implementation for bio_pype.

An helper class that generalize various tasks to build queues modules and in the meantime creates a yaml file that records running jobs and job dependencies, agnostic of the underlying queueing system used.

Parameters:
  • command (str) – The snippet name with valid arguments
  • log (pype.logger.PypeLogger) – Log object of the main pipeline
  • profile (str) – The name of the selected profile

A Usage example of this class is the following implementation of the pbs (torque) queue system:

../test/data/queues/pbs.py
import os
import datetime
from pype.utils.queues import SnippetRuntime


def submit(command, snippet_name, requirements, dependencies, log, profile):
    runtime = SnippetRuntime(command, log, profile)
    runtime.get_runtime(requirements, dependencies)
    queue_dependencies = runtime.queue_depends()
    stdout = os.path.join(log.__path__, 'stdout')
    stderr = os.path.join(log.__path__, 'stderr')
    stdout_pbs = os.path.join(log.__path__, 'stdout.pbs')
    stderr_pbs = os.path.join(log.__path__, 'stderr.pbs')

    now = datetime.datetime.now()
    now_plus_10 = now + datetime.timedelta(minutes=10)
    startime_str = now_plus_10.strftime("%H%M.%S")

    log.log.info('Execution qsub into working directory %s' % os.getcwd())
    log.log.info('Redirect stdin/stderr to folder %s' % log.__path__)
    command = '''#!/bin/bash
    exec 1>%s
    exec 2>%s
    exec %s''' % (stdout, stderr, runtime.command)
    log.log.info('Retrive custom group environment variable')
    largs = []

    if len(queue_dependencies) > 0:
        cmd_dependencies = [
            'afterok:%s' % dep for dep in queue_dependencies]
        depend = ['-W', 'depend=%s' % ','.join(cmd_dependencies)]
        largs += depend
    if 'time' in requirements.keys():
        time = ['-l', 'walltime=%s' % requirements['time']]
        largs += time
    if 'mem' in requirements.keys():
        mem = ['-l', 'mem=%s' % requirements['mem']]
        largs += mem
    if 'type' in requirements.keys():
        if requirements['type'] == 'exclusive':
            exclusive = ['-l', 'naccesspolicy=singlejob']
            largs += exclusive
    if 'ncpu' in requirements.keys():
        try:
            nodes = int(requirements['nodes'])
        except KeyError:
            nodes = 1
        cpus = ['-l', 'nodes=%i:ppn=%i' % (nodes, int(requirements['ncpu']))]
        largs += cpus
    qsub_group = os.environ.get('PYPE_QUEUE_GROUP')
    if qsub_group:
        log.log.info('Custom qsub group set to %s' % qsub_group)
        largs += ['-W', 'group_list=%s' % qsub_group, '-A', qsub_group]
    else:
        log.log.info('Custom qsub group not set')
    echo = 'echo \'%s\'' % command
    qsub = [
        'qsub', '-V', '-o', stdout_pbs, '-e', stderr_pbs,
        '-d', os.getcwd(), '-a', startime_str, '-N', snippet_name] + largs
    runtime.add_queue_commands(
        [echo, ' '.join(qsub)])
    runtime.submit_queue(5)
    runtime.commit_runtime()
    return(runtime.run_id)


def post_run(log):
    log.log.info('Done')
add_queue_commands(commands)[source]

Add the list of commands to launch the job in the queue system.

The commands will be run in a pipe, so the output of the first item in the command list will be stdin of the second item, and so on.

Parameters:commands (list) – List of string with the commands
add_queue_id(queue_id)[source]

Add a job ID for the snippet.

This is useful when the queue command is not submitted using SnippetRuntime.submit_queue(), so the job id is not automatically registered in the runtime object.

Parameters:queue_id (str) – Job id string
change_sleep(sleep_sec)[source]

Change the number of seconds to wait after submitting a job in the queue system.

It is used in SnippetRuntime.submit_queue(). It alters the attribute SnippetRuntime.sleep

Parameters:sleep_sec (int) – Number of seconds
commit_runtime()[source]

Save the runtime dictionary in the pipeline_runtime.yaml file

The path of pipeline_runtime.yaml is the parent directory of the snippet log.

get_runtime(requirements, dependencies)[source]

Load the runtime object, if does not exists initiate a new runtime dictionary.

Parameters:
  • requirements (dict) – Dictionary specifying the snippet requirements
  • dependencies (list) – List of other snippets ids to which this snippets depends (it will run if/when the other job are terminated)
queue_depends()[source]

Returns the list of queue ids to which this command depends

The list in the runtime dictionary, in the key dependencies consinst on unique ids of the runtime object, this methods simply converts the runtime ids into queue ids.

Returns:Queue id dependency list
Return type:list
submit_queue(retry=1)[source]

Execute the queue commands, and add the resulting job id in the runtime dictionary.

The method accepts a number of retry attempts, which will enable to reiterate the specified number of time in case of failure, before failing the pipeline

Parameters:retry (int, optional) – Number of attempts before failing, defaults to 1

Snippets and Profiles

Process

class pype.process.Command(cmd, log, profile, name='')[source]

High level class to use subprocess.Popen combined with Volume and Namespace classes.

The Command class is a wrapper around subprocess.Popen that results in a more succinct code, increasing the readability of the command lines that are going to be executed rather then the subprocess.Popen boilerplate.

The class initialization requires the command line string, a Profile class and a log object (eg the snippet log object).

Parameters:
  • cmd (str) – Command line string
  • log (pype.logger.PypeLogger) – Log class of the running snippet
  • profile (pype.utils.profiles.Profile) – A Profile object
  • name (str, optional) – String used to identify the process in the log, defaults to ‘’
add_input(in_file, match='exact')[source]

The match argument can be either exact or recursive. - exact will match only the specified file - recursive will match all the file with the same prefix

of the specified file

[summary]

[extended_summary]

Parameters:
  • in_file ([type]) – [description]
  • match (str, optional) – [description], defaults to ‘exact’
add_namespace(namespace)[source]

[summary]

[extended_summary]

add_output(out_file)[source]

[summary]

[extended_summary]

Parameters:out_file ([type]) – [description]
add_volume(path, output=False)[source]

[summary]

[extended_summary]

Parameters:
  • path ([type]) – [description]
  • output (bool, optional) – [description], defaults to False
child_close()[source]

[summary]

[extended_summary]

close()[source]

[summary]

[extended_summary]

Returns:[description]
Return type:[type]
docker(local_script)[source]

[summary]

[extended_summary]

Raises:Exception – [description]
pipe_in(command, local_script=False)[source]

[summary]

[extended_summary]

Parameters:
  • command ([type]) – [description]
  • local_script (bool, optional) – [description], defaults to False
replace_values_in_code(code_file)[source]

[summary]

[extended_summary]

Parameters:code_file ([type]) – [description]
run(local_script=False)[source]

[summary]

[extended_summary]

Parameters:local_script (bool, optional) – [description], defaults to False
class pype.process.Namespace(program_dict, log, profile)[source]

A mechanism to load different environments

Define a basic abstraction layer to load programs and environments to the Command class

[summary]

[extended_summary]

Parameters:
  • program_dict (dict) – A dictionary with the following keys namespace, version, dependencies. namespace is a string composed by the the namespace type and the namespace item, separated by the @ character. The supported namespace types are docker, env_modules and path. the namespace item is a string relevant to the namespace type (eg. the docker container repository url). the version is a string defining the tag/version of the docker container or the version of the program to load (again, depending on the namespace type selected). dependencies is a key only used for the env_modules namespace and is used to load other environment modules to satisfy the loading dependencies.
  • log (pype.logger.PypeLogger) – Log object to append logging in the snippet log file
  • profile (pype.utils.profiles.Profile) – Profile object
Raises:
  • SnippetNamespaceErrorWrong Namespace format if the namespace does have more then @ characters.
  • SnippetNamespaceErrorNot supported namespace if the namespace type is not docker, env_modules or path.
  • SnippetNamespaceErrorAll dependencies must be type env_module if some of the dependencies defined in the dependencies key is not a namespace of the env_modules type.
class pype.process.Volume(path, output=False, bind_prefix='/var/lib/pype')[source]

Volume class to abstract and parametrize the binding of files while running commands in containerized environments.

The class contains also method to adjust the bind volume argument to implementation such as udocker and singularity.

Init the class defining the path in the host environment, the prefix in the container environment and flagging if the path is a input or an output target

Parameters:
  • path (str) – File or directory to bind in the host system
  • output (bool, optional) – Set to True if path is an output target, defaults to False.
  • bind_prefix (str, optional) – Prefix path in the container environment, defaults to ‘/var/lib/pype’.
remove_mode()[source]

Removes the trainling mode (eg the ending :rw) from the bining string.

replace_bind_dirname(bind_path)[source]

Replaces the bind volume in the container environment with the dirname of the specified bind path.

This is useful to give the same binding point to multiple paths (defined in multiple Volume classes) that are in the same folder in the host system.

Parameters:bind_path (str) – Binding point to replace instead of the current one randomly generated by the class.
replace_bind_volume(bind_path)[source]

Replaces the bind volume in the container environment with the specified bind path.

This is useful to manage binding point to multiple paths (defined in multiple Volume classes) that are subfolders of another bind volume in the host system.

Parameters:bind_path (str) – Binding point to replace instead of the current one randomly generated by the class.
singularity_volume()[source]

Format the volume binding string following the singularity command line syntax.

to_str()[source]

Returns a string with the bind volume argument relative to the content of the class

Returns:Bind volume string
Return type:str

Misc

class pype.misc.DefaultHelpParser(prog=None, usage=None, description=None, epilog=None, parents=[], formatter_class=<class 'argparse.HelpFormatter'>, prefix_chars='-', fromfile_prefix_chars=None, argument_default=None, conflict_handler='error', add_help=True, allow_abbrev=True)[source]
error(message: string)[source]

Prints a usage message incorporating the message to stderr and exits.

If you override this in a subclass, it should not return – it should either exit or raise an exception.

class pype.misc.SubcommandHelpFormatter(prog, indent_increment=2, max_help_position=24, width=None)[source]
pype.misc.xopen(filename, mode='r')[source]

Wrap around open/gzip.open and stdin/out.

Replacement for the “open” function that can also open files that have been compressed with gzip. If the filename ends with .gz, the file is opened with gzip.open(). If it doesn’t, the regular open() is used. If the filename is ‘-‘, standard output (mode ‘w’) or input (mode ‘r’) is returned.

class pype.binfmisc.fastq(f, n=-1)[source]

Fastq iterator to extract name, sequence and quality ofr each read.

Specify the file object to iterate.

Parameters:
  • f (File) – fastq file
  • n (int, optional) – number of reads to evaluate, defaults to -1

Logging/Exceptions

exception pype.exceptions.CommandNamespaceError[source]
exception pype.exceptions.EnvModulesError[source]
exception pype.exceptions.PipelineError[source]
exception pype.exceptions.PipelineItemError[source]
exception pype.exceptions.ProfileError[source]
exception pype.exceptions.SnippetError[source]