Source code for pype.utils.queues

import os
import sys
import yaml
import shlex
import subprocess
from pype.misc import generate_uid
from time import sleep
from distutils.spawn import find_executable


def yaml_dump(command, snippet_name, requirements, dependencies, log, profile):
    pype_exec = find_executable('pype')
    if pype_exec is None:
        pype_exec = '%s -m pype.commands' % sys.executable
    command = '%s --profile %s snippets --log %s %s' % (
        pype_exec, profile, log.__path__, command)
    run_id = generate_uid(10)[-10:]
    log.log.info('Queue yaml_dump, command: %s' % command)
    log.log.info('Queue yaml_dump, requirements: %s' % requirements)
    log.log.info('Queue yaml_dump, dependencies: %s' % dependencies)
    log.log.info('Queue yaml_dump, run ID: %s_%s' % (snippet_name, run_id))
    root_dir = os.path.dirname(log.__path__)
    yaml_file = os.path.join(root_dir, 'pipeline_runtime.yaml')
    if os.path.isfile(yaml_file):
        with open(yaml_file, 'rt') as pipeline_runtime:
            runtime = yaml.safe_load(pipeline_runtime)
    else:
        runtime = {}
    runtime[run_id] = {}
    runtime[run_id]['command'] = command
    runtime[run_id]['requirements'] = requirements
    runtime[run_id]['dependencies'] = dependencies
    with open(yaml_file, 'wt') as pipeline_runtime:
        yaml.dump(runtime, pipeline_runtime, default_flow_style=False)
    sleep(1)
    return run_id


[docs]class SnippetRuntime: """ A class to help building queue modules implementation for `bio_pype`. An helper class that generalize various tasks to build queues modules and in the meantime creates a `yaml` file that records running jobs and job dependencies, agnostic of the underlying queueing system used. """ def __init__(self, command, log, profile): """ :param command: The snippet name with valid arguments :type command: str :param log: Log object of the main pipeline :type log: pype.logger.PypeLogger :param profile: The name of the selected profile :type profile: str A Usage example of this class is the following implementation of the pbs (torque) queue system: .. literalinclude:: ../test/data/queues/pbs.py :language: python :caption: :name: pbs """ pype_exec = find_executable('pype') self.log = log if pype_exec is None: pype_exec = '%s -m pype.commands' % sys.executable self.command = '%s --profile %s snippets --log %s %s' % ( pype_exec, profile, self.log.__path__, command) self.run_id = generate_uid(10)[-10:] self.runtime_dir = os.path.dirname(self.log.__path__) self.runtime_file = os.path.join( self.runtime_dir, 'pipeline_runtime.yaml') self.submit_attempts = 0 self.sleep = 1
[docs] def get_runtime(self, requirements, dependencies): """ Load the runtime object, if does not exists initiate a new runtime dictionary. :param requirements: Dictionary specifying the snippet requirements :type requirements: dict :param dependencies: List of other snippets ids to which this snippets depends (it will run if/when the other job are terminated) :type dependencies: list """ if os.path.isfile(self.runtime_file): with open(self.runtime_file, 'rt') as pipeline_runtime: self.runtime = yaml.safe_load(pipeline_runtime) else: self.runtime = {} self.runtime[self.run_id] = {} self.runtime[self.run_id]['command'] = self.command self.runtime[self.run_id]['requirements'] = requirements self.runtime[self.run_id]['dependencies'] = dependencies
[docs] def add_queue_id(self, queue_id): """ Add a job ID for the snippet. This is useful when the queue command is not submitted using :meth:`SnippetRuntime.submit_queue`, so the job id is not automatically registered in the runtime object. :param queue_id: Job id string :type queue_id: str """ self.runtime[self.run_id]['queue_id'] = queue_id
[docs] def add_queue_commands(self, commands): """ Add the list of commands to launch the job in the queue system. The commands will be run in a pipe, so the output of the first item in the command list will be `stdin` of the second item, and so on. :param commands: List of string with the commands :type commands: list """ self.runtime[self.run_id]['queue_commands'] = commands
[docs] def submit_queue(self, retry=1): """ Execute the queue commands, and add the resulting job id in the runtime dictionary. The method accepts a number of `retry` attempts, which will enable to reiterate the specified number of time in case of failure, before failing the pipeline :param retry: Number of attempts before failing, defaults to 1 :type retry: int, optional """ submit_cmd = [] pipe_nr = 1 self.log.log.info( 'Process queue command line %s' % ' | '.join( self.runtime[self.run_id]['queue_commands'])) for command in self.runtime[self.run_id]['queue_commands']: if pipe_nr <= 1: submit_cmd.append( subprocess.Popen( shlex.split(command), stdout=subprocess.PIPE)) else: submit_cmd.append( subprocess.Popen( shlex.split(command), stdout=subprocess.PIPE, stdin=submit_cmd[pipe_nr - 2].stdout)) pipe_nr += 1 out = submit_cmd[pipe_nr - 2].communicate()[0] out = out.strip().decode('UTF-8') self.runtime[self.run_id]['queue_id'] = out sleep(self.sleep) self.submit_attempts += 1 if out == str() and self.submit_attempts <= retry: self.log.log.info( 'New attempt to submit the job on the queue, %i' % retry + 1) self.submit_queue(retry) if out == str(): error_msg = ( 'Command %s could not be submitted to the' ' queue' % self.command) self.log.log.error(error_msg) raise Exception(error_msg) self.log.log.info('Command result with job ID: %s' % out)
[docs] def queue_depends(self): """ Returns the list of queue ids to which this command depends The list in the runtime dictionary, in the key `dependencies` consinst on unique ids of the runtime object, this methods simply converts the runtime ids into queue ids. :return: Queue id dependency list :rtype: list """ dependencies = [] for dep in self.runtime[self.run_id]['dependencies']: dependencies.append(self.runtime[dep]['queue_id']) return dependencies
[docs] def commit_runtime(self): """ Save the runtime dictionary in the `pipeline_runtime.yaml` file The path of `pipeline_runtime.yaml` is the parent directory of the snippet log. """ if self.runtime_file: with open(self.runtime_file, 'wt') as pipeline_runtime: yaml.dump( self.runtime, pipeline_runtime, default_flow_style=False)
[docs] def change_sleep(self, sleep_sec): """ Change the number of seconds to wait after submitting a job in the queue system. It is used in :meth:`SnippetRuntime.submit_queue`. It alters the attribute :attr:`SnippetRuntime.sleep` :param sleep_sec: Number of seconds :type sleep_sec: int """ self.sleep = sleep_sec