import os
import sys
import yaml
import shlex
import subprocess
from pype.misc import generate_uid
from time import sleep
from distutils.spawn import find_executable
def yaml_dump(command, snippet_name, requirements, dependencies, log, profile):
pype_exec = find_executable('pype')
if pype_exec is None:
pype_exec = '%s -m pype.commands' % sys.executable
command = '%s --profile %s snippets --log %s %s' % (
pype_exec, profile, log.__path__, command)
run_id = generate_uid(10)[-10:]
log.log.info('Queue yaml_dump, command: %s' % command)
log.log.info('Queue yaml_dump, requirements: %s' % requirements)
log.log.info('Queue yaml_dump, dependencies: %s' % dependencies)
log.log.info('Queue yaml_dump, run ID: %s_%s' % (snippet_name, run_id))
root_dir = os.path.dirname(log.__path__)
yaml_file = os.path.join(root_dir, 'pipeline_runtime.yaml')
if os.path.isfile(yaml_file):
with open(yaml_file, 'rt') as pipeline_runtime:
runtime = yaml.safe_load(pipeline_runtime)
else:
runtime = {}
runtime[run_id] = {}
runtime[run_id]['command'] = command
runtime[run_id]['requirements'] = requirements
runtime[run_id]['dependencies'] = dependencies
with open(yaml_file, 'wt') as pipeline_runtime:
yaml.dump(runtime, pipeline_runtime, default_flow_style=False)
sleep(1)
return run_id
[docs]class SnippetRuntime:
"""
A class to help building queue modules implementation for `bio_pype`.
An helper class that generalize various tasks to build queues modules
and in the meantime creates a `yaml` file that records running jobs and
job dependencies, agnostic of the underlying queueing system used.
"""
def __init__(self, command, log, profile):
"""
:param command: The snippet name with valid arguments
:type command: str
:param log: Log object of the main pipeline
:type log: pype.logger.PypeLogger
:param profile: The name of the selected profile
:type profile: str
A Usage example of this class is the following
implementation of the pbs (torque) queue system:
.. literalinclude:: ../test/data/queues/pbs.py
:language: python
:caption:
:name: pbs
"""
pype_exec = find_executable('pype')
self.log = log
if pype_exec is None:
pype_exec = '%s -m pype.commands' % sys.executable
self.command = '%s --profile %s snippets --log %s %s' % (
pype_exec, profile, self.log.__path__, command)
self.run_id = generate_uid(10)[-10:]
self.runtime_dir = os.path.dirname(self.log.__path__)
self.runtime_file = os.path.join(
self.runtime_dir, 'pipeline_runtime.yaml')
self.submit_attempts = 0
self.sleep = 1
[docs] def get_runtime(self, requirements, dependencies):
"""
Load the runtime object, if does not exists
initiate a new runtime dictionary.
:param requirements: Dictionary specifying the snippet requirements
:type requirements: dict
:param dependencies: List of other snippets ids to which this snippets
depends (it will run if/when the other job are terminated)
:type dependencies: list
"""
if os.path.isfile(self.runtime_file):
with open(self.runtime_file, 'rt') as pipeline_runtime:
self.runtime = yaml.safe_load(pipeline_runtime)
else:
self.runtime = {}
self.runtime[self.run_id] = {}
self.runtime[self.run_id]['command'] = self.command
self.runtime[self.run_id]['requirements'] = requirements
self.runtime[self.run_id]['dependencies'] = dependencies
[docs] def add_queue_id(self, queue_id):
"""
Add a job ID for the snippet.
This is useful when the queue command is not submitted using
:meth:`SnippetRuntime.submit_queue`, so the job id
is not automatically registered in the runtime object.
:param queue_id: Job id string
:type queue_id: str
"""
self.runtime[self.run_id]['queue_id'] = queue_id
[docs] def add_queue_commands(self, commands):
"""
Add the list of commands to launch the job in the queue
system.
The commands will be run in a pipe, so the output of the first
item in the command list will be `stdin` of the second item,
and so on.
:param commands: List of string with the commands
:type commands: list
"""
self.runtime[self.run_id]['queue_commands'] = commands
[docs] def submit_queue(self, retry=1):
"""
Execute the queue commands, and add the resulting job id in the
runtime dictionary.
The method accepts a number of `retry` attempts, which will enable
to reiterate the specified number of time in case of failure, before
failing the pipeline
:param retry: Number of attempts before failing, defaults to 1
:type retry: int, optional
"""
submit_cmd = []
pipe_nr = 1
self.log.log.info(
'Process queue command line %s' % ' | '.join(
self.runtime[self.run_id]['queue_commands']))
for command in self.runtime[self.run_id]['queue_commands']:
if pipe_nr <= 1:
submit_cmd.append(
subprocess.Popen(
shlex.split(command), stdout=subprocess.PIPE))
else:
submit_cmd.append(
subprocess.Popen(
shlex.split(command), stdout=subprocess.PIPE,
stdin=submit_cmd[pipe_nr - 2].stdout))
pipe_nr += 1
out = submit_cmd[pipe_nr - 2].communicate()[0]
out = out.strip().decode('UTF-8')
self.runtime[self.run_id]['queue_id'] = out
sleep(self.sleep)
self.submit_attempts += 1
if out == str() and self.submit_attempts <= retry:
self.log.log.info(
'New attempt to submit the job on the queue, %i' % retry + 1)
self.submit_queue(retry)
if out == str():
error_msg = (
'Command %s could not be submitted to the'
' queue' % self.command)
self.log.log.error(error_msg)
raise Exception(error_msg)
self.log.log.info('Command result with job ID: %s' % out)
[docs] def queue_depends(self):
"""
Returns the list of queue ids to which this command depends
The list in the runtime dictionary, in the key `dependencies`
consinst on unique ids of the runtime object, this methods
simply converts the runtime ids into queue ids.
:return: Queue id dependency list
:rtype: list
"""
dependencies = []
for dep in self.runtime[self.run_id]['dependencies']:
dependencies.append(self.runtime[dep]['queue_id'])
return dependencies
[docs] def commit_runtime(self):
"""
Save the runtime dictionary in the `pipeline_runtime.yaml` file
The path of `pipeline_runtime.yaml` is the parent directory of the
snippet log.
"""
if self.runtime_file:
with open(self.runtime_file, 'wt') as pipeline_runtime:
yaml.dump(
self.runtime, pipeline_runtime, default_flow_style=False)
[docs] def change_sleep(self, sleep_sec):
"""
Change the number of seconds to wait after submitting a job in the
queue system.
It is used in :meth:`SnippetRuntime.submit_queue`.
It alters the attribute :attr:`SnippetRuntime.sleep`
:param sleep_sec: Number of seconds
:type sleep_sec: int
"""
self.sleep = sleep_sec