import os
import sys
import imp
import gzip
import argparse
import datetime
import random
import string
import types
import importlib.machinery
def import_module(module_name, module_path):
loader = importlib.machinery.SourceFileLoader(
module_name, module_path)
mod = types.ModuleType(loader.name)
loader.exec_module(mod)
return mod
def generate_uid(n=4):
random_str = ''.join(random.choice(
string.ascii_uppercase + string.digits) for _ in range(n))
random_str = '%s_%s' % (
datetime.datetime.now().strftime("%y%m%d%H%M%S.%f"), random_str)
return random_str
def package_modules(package):
pathname = package.__path__[0]
return {
'.'.join([
package.__name__,
os.path.splitext(module)[0]]) for module in os.listdir(
pathname) if module.endswith(
'.py') and not module.startswith('__init__')}
def package_files(package, extension):
pathname = package.__path__[0]
return {
os.path.abspath(
os.path.join(pathname, file)) for file in os.listdir(
pathname) if file.endswith(extension)}
def try_import(path, module_name):
if not os.path.isdir(os.path.join(path, module_name)):
os.makedirs(os.path.join(path, module_name))
try:
mod = imp.load_module(module_name, None,
os.path.join(path, module_name),
('', '', 5))
except ValueError:
init_path = os.path.join(path, module_name, '__init__.py')
try:
with open(init_path, 'a'):
os.utime(init_path, None)
mod = imp.load_module(module_name, None,
os.path.join(path, module_name),
('', '', 5))
except IOError:
raise Exception(('There is no directory in the path %s. '
'Check your configuration or create '
'the directory in the desired path.') %
os.path.join(path, module_name))
return mod
def get_modules(parent, subparsers, progs):
mods = package_modules(parent)
for mod in sorted(mods):
try:
__import__(mod)
mod_name = mod.split('.')[-1]
m = getattr(parent, mod_name)
m.add_parser(subparsers, mod_name)
progs[mod_name] = getattr(m, mod_name)
except AttributeError:
pass
return progs
def get_modules_names(parent):
mods = package_modules(parent)
modules = []
for mod in mods:
try:
__import__(mod)
mod_name = mod.split('.')[-1]
modules.append(mod_name)
except AttributeError:
pass
return modules
def get_module_method(parent, module, method):
mods = package_modules(parent)
result = None
for mod in mods:
try:
__import__(mod)
mod_name = mod.split('.')[-1]
if mod_name == module:
m = getattr(parent, mod_name)
result = getattr(m, method)
break
except AttributeError:
pass
return result
[docs]def xopen(filename, mode='r'):
"""
Wrap around open/gzip.open and stdin/out.
Replacement for the "open" function that can also open
files that have been compressed with gzip. If the filename ends with .gz,
the file is opened with gzip.open(). If it doesn't, the regular open()
is used. If the filename is '-', standard output (mode 'w') or input
(mode 'r') is returned.
"""
if not isinstance(filename, str):
raise AssertionError
if filename == '-':
return sys.stdin if 'r' in mode else sys.stdout
if filename.endswith('.gz'):
return gzip.open(filename, mode)
return open(filename, mode)
def check_exit_code(process, sting, results_dict, log):
log.log.info('Checking exit code for process %s' % string)
code = process.returncode
info = 'Process terminated, exit code: %s' % code
if code == 0:
log.log.info(info)
else:
log.log.error(info)
log.log.warning('Removing results:')
for result in results_dict:
for res in results_dict[result]:
try:
log.log.warning('Attempt to remove results: %s' % res)
os.remove(res)
except OSError as e:
log.log.warning('Failed to remove results: %s; %s'
% (res, e))
log.log.warning('Terminate the process')
raise Exception('Process %s exited with code %s' % (string, code))
def human_format(num, base=1000):
prefix_index = ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']
magnitude = 0
orig = num
dec = 0
while abs(num) >= base:
magnitude += 1
num /= float(base)
try:
dec = str(num).split('.')[1]
dec = float('0.%s' % dec) * base**magnitude
dec = human_format(int(dec))
if dec == '0':
dec = ''
except IndexError:
dec = ''
try:
return '%i%s%s' % (num, prefix_index[magnitude], dec)
except IndexError:
return '%i' % orig
def bases_format(string_unit, base=1000):
symbols = ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']
symbolsB = ['%sB' % x for x in symbols]
num = ''
while string_unit and \
string_unit[0:1].isdigit() or \
string_unit[0:1] == '.':
num += string_unit[0]
string_unit = string_unit[1:]
letter = string_unit.strip().upper()
if not (num.isdigit() and letter in symbols + symbolsB):
raise AssertionError
num = float(num)
if len(letter) <= 1 and letter != "B":
prefix = {symbols[0]: 1}
for i, unit in enumerate(symbols[1:]):
prefix[unit] = base**(i + 1)
else:
prefix = {symbolsB[0]: 1}
for i, unit in enumerate(symbolsB[1:]):
prefix[unit] = base**(i + 1)
return int(num * prefix[letter])
def basename_no_extension(file_name):
base_name = os.path.basename(file_name)
if '.' in base_name:
separator_index = base_name.index('.')
base_name = base_name[:separator_index]
return base_name
return base_name
class Tee():
def __init__(self, f1, f2):
self.f1, self.f2 = f1, f2
def write(self, msg):
self.f1.write(msg)
self.f2.write(msg)
[docs]class DefaultHelpParser(argparse.ArgumentParser):
[docs] def error(self, message):
sys.stderr.write('error: %s\n' % message)
self.print_help()
sys.exit(2)