Source code for pype.misc

import os
import sys
import gzip
import datetime
import secrets
import string
import types
from pathlib import Path
from pype import argparse
from importlib.util import spec_from_file_location, module_from_spec
from typing import Set, Dict, Any, Optional, Union, List


[docs] def import_module(module_name: str, module_path: str) -> types.ModuleType: """Import a module from a specific path. Args: module_name: Name to give to the imported module module_path: Path to the module file Returns: Imported module object Raises: ImportError: If module cannot be loaded """ spec = spec_from_file_location(module_name, module_path) if spec is None or spec.loader is None: raise ImportError(f"Could not create module specification for {module_path}") module = module_from_spec(spec) sys.modules[module_name] = module # Register the module in sys.modules spec.loader.exec_module(module) return module
[docs] def generate_uid(n: int = 4, timestamp: bool = True) -> str: """Generate a unique identifier with timestamp and random chars.""" timestamp_str = datetime.datetime.now().strftime("%y%m%d%H%M%S.%f") random_str = "".join( [secrets.choice(string.ascii_uppercase + string.digits) for _ in range(n)] ) if timestamp: return f"{timestamp_str}_{random_str}" else: return random_str
[docs] def package_modules(package: types.ModuleType) -> Set[str]: """Get all Python modules in a package directory.""" path = Path(package.__path__[0]) return { f"{package.__name__}.{module.stem}" for module in path.glob("*.py") if module.name != "__init__.py" }
[docs] def package_files(package: types.ModuleType, extension: str) -> Set[str]: """Get all files with specific extension in a package directory.""" path = Path(package.__path__[0]) return {str(file.absolute()) for file in path.glob(f"*{extension}")}
[docs] def try_import(path: str, module_name: str) -> types.ModuleType: """Import a module, creating __init__.py if needed. Args: path: Directory path where the module should be located/created module_name: Name of the module to import Returns: Imported module object Raises: ImportError: If module cannot be created or loaded """ module_path = Path(path) / module_name init_path = module_path / "__init__.py" try: # Ensure directory and init file exist module_path.mkdir(parents=True, exist_ok=True) if not init_path.exists(): init_path.touch() # Create module spec and load module spec = spec_from_file_location(module_name, str(init_path)) if spec is None or spec.loader is None: raise ImportError( f"Could not create module specification for {module_name}" ) module = module_from_spec(spec) sys.modules[module_name] = module # Register the module in sys.modules spec.loader.exec_module(module) return module except Exception as e: raise ImportError( f"Failed to import module '{module_name}' from path '{path}': {e}" ) from e
[docs] def get_modules(parent: types.ModuleType, subparsers: Any, progs: Dict) -> Dict: """Get all modules and their parsers.""" for mod_path in sorted(package_modules(parent)): try: __import__(mod_path) mod_name = mod_path.split(".")[-1] module = getattr(parent, mod_name) module.add_parser(subparsers, mod_name) progs[mod_name] = getattr(module, mod_name) except AttributeError: pass return progs
[docs] def get_modules_names(parent: types.ModuleType) -> List[str]: """Get names of all valid modules in parent.""" mods = package_modules(parent) modules = [] for mod in mods: try: __import__(mod) mod_name = mod.split(".")[-1] modules.append(mod_name) except AttributeError: pass return modules
[docs] def get_module_method( parent: types.ModuleType, module: str, method: str ) -> Optional[Any]: """Get a specific method from a module.""" try: mod = getattr(parent, module) return getattr(mod, method) except AttributeError: return None
[docs] def xopen(filename: str, mode: str = "r") -> Any: """Smart file opener that handles gzip and stdin/stdout.""" if not isinstance(filename, str): raise TypeError("Filename must be a string") if filename == "-": return sys.stdin if "r" in mode else sys.stdout opener = gzip.open if filename.endswith(".gz") else open return opener(filename, mode)
[docs] def check_exit_code(process, sting, results_dict, log): log.log.info("Checking exit code for process %s" % string) code = process.returncode info = "Process terminated, exit code: %s" % code if code == 0: log.log.info(info) else: log.log.error(info) log.log.warning("Removing results:") for result in results_dict: for res in results_dict[result]: try: log.log.warning("Attempt to remove results: %s" % res) os.remove(res) except OSError as e: log.log.warning("Failed to remove results: %s; %s" % (res, e)) log.log.warning("Terminate the process") raise Exception("Process %s exited with code %s" % (string, code))
[docs] def human_format(num: Union[int, float], base: int = 1000) -> str: """Format numbers with human readable units.""" prefixes = ["", "K", "M", "G", "T", "P", "E", "Z", "Y"] magnitude = 0 while abs(num) >= base and magnitude < len(prefixes) - 1: magnitude += 1 num /= base if isinstance(num, float): decimal_part = num - int(num) if decimal_part: decimal_str = human_format(int(decimal_part * base**magnitude)) return f"{int(num)}{prefixes[magnitude]}{decimal_str}" return f"{int(num)}{prefixes[magnitude]}"
[docs] def bases_format(string_unit: str, base: int = 1000) -> int: """Convert string with units to number of bases.""" if not string_unit: raise ValueError("Empty string provided") symbols = ["", "K", "M", "G", "T", "P", "E", "Z", "Y"] prefix_bytes = [f"{p}B" for p in symbols] # Split number and unit num = "" i = 0 while i < len(string_unit) and (string_unit[i].isdigit() or string_unit[i] == "."): num += string_unit[i] i += 1 if not num: raise ValueError("No number found in string") unit = string_unit[i:].strip().upper() if unit not in (symbols + prefix_bytes): raise ValueError(f"Invalid unit: {unit}") # Calculate multiplier multiplier = base ** ( symbols.index(unit) if unit in symbols else prefix_bytes.index(unit) ) return int(float(num) * multiplier)
[docs] def basename_no_extension(file_name: str) -> str: """Get basename without extension.""" return Path(file_name).stem
[docs] class Tee:
[docs] def __init__(self, f1, f2): self.f1, self.f2 = f1, f2
[docs] def write(self, msg): self.f1.write(msg) self.f2.write(msg)
[docs] class SubcommandHelpFormatter(argparse.RawDescriptionHelpFormatter): def _format_action(self, action): parts = super()._format_action(action) if action.nargs == argparse.PARSER: parts = "\n".join(parts.split("\n")[1:]) return parts
[docs] class DefaultHelpParser(argparse.ArgumentParser):
[docs] def error(self, message): sys.stderr.write("error: %s\n" % message) self.print_help() sys.exit(2)