import os
import sys
import gzip
import datetime
import secrets
import string
import types
from pathlib import Path
from pype import argparse
from importlib.util import spec_from_file_location, module_from_spec
from typing import Set, Dict, Any, Optional, Union, List
[docs]
def import_module(module_name: str, module_path: str) -> types.ModuleType:
"""Import a module from a specific path.
Args:
module_name: Name to give to the imported module
module_path: Path to the module file
Returns:
Imported module object
Raises:
ImportError: If module cannot be loaded
"""
spec = spec_from_file_location(module_name, module_path)
if spec is None or spec.loader is None:
raise ImportError(f"Could not create module specification for {module_path}")
module = module_from_spec(spec)
sys.modules[module_name] = module # Register the module in sys.modules
spec.loader.exec_module(module)
return module
[docs]
def generate_uid(n: int = 4, timestamp: bool = True) -> str:
"""Generate a unique identifier with timestamp and random chars."""
timestamp_str = datetime.datetime.now().strftime("%y%m%d%H%M%S.%f")
random_str = "".join(
[secrets.choice(string.ascii_uppercase + string.digits) for _ in range(n)]
)
if timestamp:
return f"{timestamp_str}_{random_str}"
else:
return random_str
[docs]
def package_modules(package: types.ModuleType) -> Set[str]:
"""Get all Python modules in a package directory."""
path = Path(package.__path__[0])
return {
f"{package.__name__}.{module.stem}"
for module in path.glob("*.py")
if module.name != "__init__.py"
}
[docs]
def package_files(package: types.ModuleType, extension: str) -> Set[str]:
"""Get all files with specific extension in a package directory."""
path = Path(package.__path__[0])
return {str(file.absolute()) for file in path.glob(f"*{extension}")}
[docs]
def try_import(path: str, module_name: str) -> types.ModuleType:
"""Import a module, creating __init__.py if needed.
Args:
path: Directory path where the module should be located/created
module_name: Name of the module to import
Returns:
Imported module object
Raises:
ImportError: If module cannot be created or loaded
"""
module_path = Path(path) / module_name
init_path = module_path / "__init__.py"
try:
# Ensure directory and init file exist
module_path.mkdir(parents=True, exist_ok=True)
if not init_path.exists():
init_path.touch()
# Create module spec and load module
spec = spec_from_file_location(module_name, str(init_path))
if spec is None or spec.loader is None:
raise ImportError(
f"Could not create module specification for {module_name}"
)
module = module_from_spec(spec)
sys.modules[module_name] = module # Register the module in sys.modules
spec.loader.exec_module(module)
return module
except Exception as e:
raise ImportError(
f"Failed to import module '{module_name}' from path '{path}': {e}"
) from e
[docs]
def get_modules(parent: types.ModuleType, subparsers: Any, progs: Dict) -> Dict:
"""Get all modules and their parsers."""
for mod_path in sorted(package_modules(parent)):
try:
__import__(mod_path)
mod_name = mod_path.split(".")[-1]
module = getattr(parent, mod_name)
module.add_parser(subparsers, mod_name)
progs[mod_name] = getattr(module, mod_name)
except AttributeError:
pass
return progs
[docs]
def get_modules_names(parent: types.ModuleType) -> List[str]:
"""Get names of all valid modules in parent."""
mods = package_modules(parent)
modules = []
for mod in mods:
try:
__import__(mod)
mod_name = mod.split(".")[-1]
modules.append(mod_name)
except AttributeError:
pass
return modules
[docs]
def get_module_method(
parent: types.ModuleType, module: str, method: str
) -> Optional[Any]:
"""Get a specific method from a module."""
try:
mod = getattr(parent, module)
return getattr(mod, method)
except AttributeError:
return None
[docs]
def xopen(filename: str, mode: str = "r") -> Any:
"""Smart file opener that handles gzip and stdin/stdout."""
if not isinstance(filename, str):
raise TypeError("Filename must be a string")
if filename == "-":
return sys.stdin if "r" in mode else sys.stdout
opener = gzip.open if filename.endswith(".gz") else open
return opener(filename, mode)
[docs]
def check_exit_code(process, sting, results_dict, log):
log.log.info("Checking exit code for process %s" % string)
code = process.returncode
info = "Process terminated, exit code: %s" % code
if code == 0:
log.log.info(info)
else:
log.log.error(info)
log.log.warning("Removing results:")
for result in results_dict:
for res in results_dict[result]:
try:
log.log.warning("Attempt to remove results: %s" % res)
os.remove(res)
except OSError as e:
log.log.warning("Failed to remove results: %s; %s" % (res, e))
log.log.warning("Terminate the process")
raise Exception("Process %s exited with code %s" % (string, code))
[docs]
def basename_no_extension(file_name: str) -> str:
"""Get basename without extension."""
return Path(file_name).stem
[docs]
class Tee:
[docs]
def __init__(self, f1, f2):
self.f1, self.f2 = f1, f2
[docs]
def write(self, msg):
self.f1.write(msg)
self.f2.write(msg)
[docs]
class DefaultHelpParser(argparse.ArgumentParser):
[docs]
def error(self, message):
sys.stderr.write("error: %s\n" % message)
self.print_help()
sys.exit(2)