API¶
Utils¶
Pipelines¶
-
class
pype.utils.arguments.
BatchFileArgument
(argument)[source]¶ BatchFileArgument read the arguments from a file and return the list of arguments. It is required for the execution of a batch snippet or batch pipeline.
-
class
pype.utils.arguments.
BatchListArgument
(argument)[source]¶ BatchArgument read the arguments from a file and return the list of arguments. It is required for the execution of a batch snippet or batch pipeline.
-
class
pype.utils.arguments.
CompositeArgument
(argument)[source]¶ A CompositeArgument retrieve the results from the results method of the specified snippet. It will not appear listed in the arguments help message so it’s value is None. In itself it contains a
PipelineItemArguments
object, defining the argument to pass to the results method of the snippets
-
class
pype.utils.arguments.
PipelineItemArguments
[source]¶ An object to gather the
Argument
of aPipelineItem
.This is meant to collect the structure and the type of the arguments defined in a pipeline yaml file.
-
add_argument
(argument, argument_type='argv_arg')[source]¶ Add the appropriate
Argument
class to thePipelineItemArguments
argument listParameters: - argument (dict) – An item from the list of arguments from the pipeline
yaml file. It should contain the keys prefix an pipeline_arg.
The key prefix indicate the flag usd in the snippet/pipeline to
which the
PipelineItem
is configured to execute. The key pipeline_arg indicate the keyword or object that the pipeline engine need to interpret to convert into arguments and also to construct the command line interface and. - argument_type (str) – The type of argument, this parameter will select which argument class would be used to parse the argument. possible choices are composite_arg, batch_list_arg and argv_arg. Default argv_arg.
- argument (dict) – An item from the list of arguments from the pipeline
yaml file. It should contain the keys prefix an pipeline_arg.
The key prefix indicate the flag usd in the snippet/pipeline to
which the
-
to_dict
(args_dict=None)[source]¶ Converts the argument in the
PipelineItemArguments
into dictionaries simlar to argparseExample
-
Queues¶
-
class
pype.utils.queues.
SnippetRuntime
(command, log, profile)[source]¶ A class to help building queue modules implementation for bio_pype.
An helper class that generalize various tasks to build queues modules and in the meantime creates a yaml file that records running jobs and job dependencies, agnostic of the underlying queueing system used.
Parameters: A Usage example of this class is the following implementation of the pbs (torque) queue system:
import os import datetime from pype.utils.queues import SnippetRuntime def submit(command, snippet_name, requirements, dependencies, log, profile): runtime = SnippetRuntime(command, log, profile) runtime.get_runtime(requirements, dependencies) queue_dependencies = runtime.queue_depends() stdout = os.path.join(log.__path__, 'stdout') stderr = os.path.join(log.__path__, 'stderr') stdout_pbs = os.path.join(log.__path__, 'stdout.pbs') stderr_pbs = os.path.join(log.__path__, 'stderr.pbs') now = datetime.datetime.now() now_plus_10 = now + datetime.timedelta(minutes=10) startime_str = now_plus_10.strftime("%H%M.%S") log.log.info('Execution qsub into working directory %s' % os.getcwd()) log.log.info('Redirect stdin/stderr to folder %s' % log.__path__) command = '''#!/bin/bash exec 1>%s exec 2>%s exec %s''' % (stdout, stderr, runtime.command) log.log.info('Retrive custom group environment variable') largs = [] if len(queue_dependencies) > 0: cmd_dependencies = [ 'afterok:%s' % dep for dep in queue_dependencies] depend = ['-W', 'depend=%s' % ','.join(cmd_dependencies)] largs += depend if 'time' in requirements.keys(): time = ['-l', 'walltime=%s' % requirements['time']] largs += time if 'mem' in requirements.keys(): mem = ['-l', 'mem=%s' % requirements['mem']] largs += mem if 'type' in requirements.keys(): if requirements['type'] == 'exclusive': exclusive = ['-l', 'naccesspolicy=singlejob'] largs += exclusive if 'ncpu' in requirements.keys(): try: nodes = int(requirements['nodes']) except KeyError: nodes = 1 cpus = ['-l', 'nodes=%i:ppn=%i' % (nodes, int(requirements['ncpu']))] largs += cpus qsub_group = os.environ.get('PYPE_QUEUE_GROUP') if qsub_group: log.log.info('Custom qsub group set to %s' % qsub_group) largs += ['-W', 'group_list=%s' % qsub_group, '-A', qsub_group] else: log.log.info('Custom qsub group not set') echo = 'echo \'%s\'' % command qsub = [ 'qsub', '-V', '-o', stdout_pbs, '-e', stderr_pbs, '-d', os.getcwd(), '-a', startime_str, '-N', snippet_name] + largs runtime.add_queue_commands( [echo, ' '.join(qsub)]) runtime.submit_queue(5) runtime.commit_runtime() return(runtime.run_id) def post_run(log): log.log.info('Done')
-
add_queue_commands
(commands)[source]¶ Add the list of commands to launch the job in the queue system.
The commands will be run in a pipe, so the output of the first item in the command list will be stdin of the second item, and so on.
Parameters: commands (list) – List of string with the commands
-
add_queue_id
(queue_id)[source]¶ Add a job ID for the snippet.
This is useful when the queue command is not submitted using
SnippetRuntime.submit_queue()
, so the job id is not automatically registered in the runtime object.Parameters: queue_id (str) – Job id string
-
change_sleep
(sleep_sec)[source]¶ Change the number of seconds to wait after submitting a job in the queue system.
It is used in
SnippetRuntime.submit_queue()
. It alters the attributeSnippetRuntime.sleep
Parameters: sleep_sec (int) – Number of seconds
-
commit_runtime
()[source]¶ Save the runtime dictionary in the pipeline_runtime.yaml file
The path of pipeline_runtime.yaml is the parent directory of the snippet log.
-
get_runtime
(requirements, dependencies)[source]¶ Load the runtime object, if does not exists initiate a new runtime dictionary.
Parameters:
-
queue_depends
()[source]¶ Returns the list of queue ids to which this command depends
The list in the runtime dictionary, in the key dependencies consinst on unique ids of the runtime object, this methods simply converts the runtime ids into queue ids.
Returns: Queue id dependency list Return type: list
-
submit_queue
(retry=1)[source]¶ Execute the queue commands, and add the resulting job id in the runtime dictionary.
The method accepts a number of retry attempts, which will enable to reiterate the specified number of time in case of failure, before failing the pipeline
Parameters: retry (int, optional) – Number of attempts before failing, defaults to 1
-
Snippets and Profiles¶
Process¶
-
class
pype.process.
Command
(cmd, log, profile, name='')[source]¶ High level class to use
subprocess.Popen
combined withVolume
andNamespace
classes.The
Command
class is a wrapper aroundsubprocess.Popen
that results in a more succinct code, increasing the readability of the command lines that are going to be executed rather then thesubprocess.Popen
boilerplate.The class initialization requires the command line string, a
Profile
class and a log object (eg the snippet log object).Parameters: -
add_input
(in_file, match='exact')[source]¶ The match argument can be either exact or recursive. - exact will match only the specified file - recursive will match all the file with the same prefix
of the specified file[summary]
[extended_summary]
Parameters:
-
add_output
(out_file)[source]¶ [summary]
[extended_summary]
Parameters: out_file ([type]) – [description]
-
-
class
pype.process.
Namespace
(program_dict, log, profile)[source]¶ A mechanism to load different environments
Define a basic abstraction layer to load programs and environments to the
Command
class[summary]
[extended_summary]
Parameters: - program_dict (dict) – A dictionary with the following keys namespace, version, dependencies. namespace is a string composed by the the namespace type and the namespace item, separated by the @ character. The supported namespace types are docker, env_modules and path. the namespace item is a string relevant to the namespace type (eg. the docker container repository url). the version is a string defining the tag/version of the docker container or the version of the program to load (again, depending on the namespace type selected). dependencies is a key only used for the env_modules namespace and is used to load other environment modules to satisfy the loading dependencies.
- log (
pype.logger.PypeLogger
) – Log object to append logging in the snippet log file - profile (
pype.utils.profiles.Profile
) – Profile object
Raises: - SnippetNamespaceError – Wrong Namespace format if the namespace does have more then @ characters.
- SnippetNamespaceError – Not supported namespace if the namespace type is not docker, env_modules or path.
- SnippetNamespaceError – All dependencies must be type env_module if some of the dependencies defined in the dependencies key is not a namespace of the env_modules type.
-
class
pype.process.
Volume
(path, output=False, bind_prefix='/var/lib/pype')[source]¶ Volume class to abstract and parametrize the binding of files while running commands in containerized environments.
The class contains also method to adjust the bind volume argument to implementation such as udocker and singularity.
Init the class defining the path in the host environment, the prefix in the container environment and flagging if the path is a input or an output target
Parameters: -
replace_bind_dirname
(bind_path)[source]¶ Replaces the bind volume in the container environment with the dirname of the specified bind path.
This is useful to give the same binding point to multiple paths (defined in multiple Volume classes) that are in the same folder in the host system.
Parameters: bind_path (str) – Binding point to replace instead of the current one randomly generated by the class.
-
replace_bind_volume
(bind_path)[source]¶ Replaces the bind volume in the container environment with the specified bind path.
This is useful to manage binding point to multiple paths (defined in multiple Volume classes) that are subfolders of another bind volume in the host system.
Parameters: bind_path (str) – Binding point to replace instead of the current one randomly generated by the class.
-
Misc¶
-
class
pype.misc.
DefaultHelpParser
(prog=None, usage=None, description=None, epilog=None, parents=[], formatter_class=<class 'argparse.HelpFormatter'>, prefix_chars='-', fromfile_prefix_chars=None, argument_default=None, conflict_handler='error', add_help=True, allow_abbrev=True)[source]¶
-
class
pype.misc.
SubcommandHelpFormatter
(prog, indent_increment=2, max_help_position=24, width=None)[source]¶
-
pype.misc.
xopen
(filename, mode='r')[source]¶ Wrap around open/gzip.open and stdin/out.
Replacement for the “open” function that can also open files that have been compressed with gzip. If the filename ends with .gz, the file is opened with gzip.open(). If it doesn’t, the regular open() is used. If the filename is ‘-‘, standard output (mode ‘w’) or input (mode ‘r’) is returned.