import re
from pype.misc import xopen
from pype.modules.snippets import PYPE_SNIPPETS_MODULES
def compose_batch_description(batch_descritpion, str_description):
columns_names = batch_descritpion['required']
try:
optional_columns_names = batch_descritpion['optional']
except KeyError:
optional_columns_names = None
snippet = batch_descritpion['snippet']
if optional_columns_names:
description = (
'A tab separated text file that needs to have %i column '
'headers:\n %s\nAnd optionally the column headers:\n %s\n'
'Each row in the batch file will be independently \nrun '
'by the ''snippet %s') % (
len(columns_names), '\n '.join(columns_names),
'\n '.join(optional_columns_names), snippet
)
else:
description = (
'A tab separated text file that needs to have %i column '
'headers:\n %s\nEach row in the batch file will be '
'independently \nrun by the ''snippet %s') % (
len(columns_names), '\n '.join(columns_names), snippet
)
description = ('%s\nFor more details on the batch file column names '
'see the help message at:\n pype snippets %s\n'
'Where each argument corresponds to a column in the '
'batch file' % (description, snippet))
return '%s\n%s' % (str_description, description)
def get_arg_from_string(arg_string):
types_dict = {'s': 'str', 'i': 'int', 'f': 'float'}
arg_tag_str = r'\%\(.+\)[s,i,f]'
if re.match(arg_tag_str, arg_string):
arg_type = types_dict[arg_string[arg_string.find(')') + 1]]
arg = arg_string[arg_string.find('%(') + 2:arg_string.find(')')]
else:
arg_type = None
arg = None
return {'arg': arg, 'arg_type': arg_type}
def __add_argument_by_type__(argument_type):
arg_type_fn_dict = {
'composite_arg': CompositeArgument,
'batch_list_arg': BatchListArgument,
'batch_file_arg': BatchFileArgument,
'constant_arg': ConstantArgument,
'argv_arg': Argument}
try:
return arg_type_fn_dict[argument_type]
except KeyError:
raise Exception(
'Wrong argument type %s' % argument_type)
[docs]class PipelineItemArguments:
"""
An object to gather the :class:`Argument` of a :class:`PipelineItem`.
This is meant to collect the structure and the type of the
arguments defined in a pipeline yaml file.
"""
def __init__(self):
self.arguments = []
[docs] def add_argument(self, argument, argument_type='argv_arg'):
"""
Add the appropriate :class:`Argument` class to the
:class:`PipelineItemArguments` argument list
:param argument: An item from the list of arguments from the pipeline
yaml file. It should contain the keys `prefix` an `pipeline_arg`.
The key `prefix` indicate the flag usd in the snippet/pipeline to
which the :class:`PipelineItem` is configured to execute.
The key `pipeline_arg` indicate the keyword or object that the
pipeline engine need to interpret to convert into arguments and
also to construct the command line interface and.
:type argument: dict
:param argument_type: The type of argument, this parameter will select
which argument class would be used to parse the argument.
possible choices are composite_arg, batch_list_arg and
argv_arg. Default argv_arg.
:type argument_type: str
"""
self.arguments.append(
__add_argument_by_type__(argument_type)(
argument))
[docs] def to_dict(self, args_dict=None):
"""
Converts the argument in the :class:`PipelineItemArguments` into
dictionaries simlar to argparse
**Example**
"""
res_args = {}
batch_argv = []
for argument in self.arguments:
arg_argv = argument.to_argv(args_dict)
if argument.type.startswith('batch_'):
batch_argv = arg_argv
else:
if arg_argv[0] in res_args:
arg_val = res_args[arg_argv[0]]
if isinstance(arg_val, list):
res_args[arg_argv[0]] = arg_val + [arg_argv[1]]
else:
res_args[arg_argv[0]] = [arg_val, arg_argv[1]]
else:
res_args[arg_argv[0]] = arg_argv[1]
if len(batch_argv) > 0:
res_args_batch = []
for item in batch_argv:
temp_item = item.copy()
temp_item.update(res_args)
res_args_batch.append(temp_item)
res_args = res_args_batch
return res_args
class Argument:
def __init__(self, argument):
self.argument = argument
self.type = 'argv_arg'
self.value = argument['pipeline_arg']
self.key = argument['prefix']
try:
self.action = argument['action']
except KeyError:
self.action = 'store'
try:
self.nargs = argument['nargs']
except KeyError:
self.nargs = None
def to_argv(self, args_dict=None):
if args_dict is None:
return [self.key, self.value]
arg_key = get_arg_from_string(self.value)
value = args_dict[arg_key['arg']]
if self.action == 'store_false':
value = not value
return [self.key, value]
[docs]class CompositeArgument(Argument):
"""
A CompositeArgument retrieve the results from the results method
of the specified snippet. It will not appear listed in the
arguments help message so it's value is None.
In itself it contains a :class:`PipelineItemArguments` object, defining the
argument to pass to the results method of the snippets
"""
def __init__(self, argument):
super().__init__(argument)
self.arguments = PipelineItemArguments()
for argument_i in self.argument['pipeline_arg']['result_arguments']:
try:
arg_type = argument_i['type']
except KeyError:
arg_type = 'argv_arg'
arg_main = {
key: argument_i[key] for key in [
'prefix', 'pipeline_arg']}
self.arguments.add_argument(arg_main, arg_type)
self.type = 'composite_arg'
self.value = None
def to_argv(self, args_dict=None):
snippet = self.argument['pipeline_arg']['snippet_name']
res_key = self.argument['pipeline_arg']['result_key']
if args_dict is None:
value = self.value
else:
res_args = self.arguments.to_dict(args_dict)
if isinstance(res_args, dict):
res_args = [res_args]
value = []
for res_arg in res_args:
value.append(
PYPE_SNIPPETS_MODULES[snippet].results(res_arg)[res_key])
return(self.key, value)
[docs]class BatchFileArgument(Argument):
"""
BatchFileArgument read the arguments from a file and return the list of
arguments.
It is required for the execution of a batch snippet or batch pipeline.
"""
def __init__(self, argument):
super().__init__(argument)
self.type = 'batch_arg'
def to_argv(self, args_dict=None):
if args_dict is None:
batch_values = None
else:
batch_file = self.value % args_dict
batch_values = []
with xopen(batch_file, 'rt') as input_list:
argument_keys = next(input_list).strip().split('\t')
for line in input_list:
line_args = {}
line = line.strip().split('\t')
for index in range(len(argument_keys)):
line_args[argument_keys[index]] = line[index]
batch_values.append(line_args)
return batch_values
[docs]class BatchListArgument(Argument):
"""
BatchArgument read the arguments from a file and return the list of
arguments.
It is required for the execution of a batch snippet or batch pipeline.
"""
def __init__(self, argument):
super().__init__(argument)
self.type = 'batch_arg'
self.value = 'batch_list'
def to_argv(self, args_dict=None):
if args_dict is None:
batch_values = None
else:
batch_values = self.argument['pipeline_arg']
return batch_values
[docs]class ConstantArgument(Argument):
"""
xxxx
"""
def __init__(self, argument):
super().__init__(argument)
self.type = 'str_arg'
self.action = 'store'
self.nargs = None
def to_argv(self, args_dict=None):
return [self.key, self.value]