Source code for clinvar_build.utils.config_tools

"""
Configuration parsing and XML validation utilities for ClinVar Build.

This module provides utilities for parsing configuration files, and managing
logging output for long-running operations. It includes classes for handling
block-based configuration files and property management with controlled access.
"""
import re
import os
import sys
import warnings
import logging
from pathlib import Path
from typing import Self
from clinvar_build.constants import (
    UtilsConfigData as ConfigNames,
    _CONFIG_DIR,
)
from clinvar_build.errors import (
    is_type,
)

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] class ManagedProperty(object): """ A generic property factory defining setters and getters, with optional type validation. Parameters ---------- name : `str` The name of the setters and getters types: `Type`, default `NoneType` Either a single type, or a tuple of types to test against. Methods ------- enable_setter() Enables the setter for the property, allowing attribute assignment. disable_setter() Disables the setter for the property, making the property read-only. set_with_setter(instance, value) Enables the setter, sets the property value, and then disables the setter, ensuring controlled updates. Returns ------- property A property object with getter and setter. """ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def __init__(self, name: str, types: tuple[type] | type | None = None): """ Initialize the ManagedProperty. """ self.name = name self.types = types self._setter_enabled = True
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # NOTE owner is part of the descriptor protocol for __get__, leave it
[docs] def __get__(self, instance, owner): """Getter for the property.""" if instance is None: return self return instance.__dict__.get(self.name)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def __set__(self, instance, value): """Setter for the property.""" owner = type(instance) if not self._setter_enabled: raise AttributeError(f"The property '{self.name}' on " f"{owner.__name__} is read-only.") if self.types and not isinstance(value, self.types): raise ValueError( f"Expected any of {self.types}, got {type(value)} " f"for property '{self.name}'." ) instance.__dict__[self.name] = value
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def enable_setter(self): """Enable the setter for the property.""" self._setter_enabled = True
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def disable_setter(self): """Disable the setter for the property.""" self._setter_enabled = False
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def set_with_setter(self, instance, value): """ Enable the setter, set the property value, and then disable the setter. Parameters ---------- instance : `object` The instance on which the property is being set. value : `any` The value to assign to the property. """ try: self.enable_setter() setattr(instance, self.name, value) finally: self.disable_setter()
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
[docs] class BlockConfigParser(object): """ Parses configuration files with headers and concatenated content, essentially parsing a block of text to a single string. This parser identifies sections marked by headers wrapped in square brackets and appends all subsequent lines (until the next header) into a single string for each section. Parameters ---------- path : `str` or `Path` Path to the configuration file to be parsed. Attributes ---------- parsed_data : `dict` [`str`, `str`] The parsed data. Examples -------- Given a configuration file: [header1] This is line one This is line two [header2] Another section With multiple lines The parser will create: { 'header1': 'This is line one\\nThis is line two', 'header2': 'Another section\\nWith multiple lines' } """ # properties path = ManagedProperty(ConfigNames.path) _data = ManagedProperty(ConfigNames.data, dict) parsed_data = ManagedProperty(ConfigNames.parsed_data, dict) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def __init__(self, path:str): ''' Initialize the ConfigParser instance. ''' # making sure the setter is only used during _init_ getattr(type(self),ConfigNames.path).set_with_setter(self, path) getattr(type(self),ConfigNames.data).set_with_setter(self, {})
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def __eq__(self, other): """ Determine how instances are compared. """ if len(getattr(self, ConfigNames.data)) == 0: if self.path == other.path and self._data == other._data: return True elif self._data == other._data: return True return False
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _process_line(self, line:str, sep:str) -> None: """ Process configuration file lines. This method handles subsection headers and key-value pairs within a section. It updates the internal data structure with parsed content. Parameters ---------- line : `str` The line to process. Returns ------- None Modifies the internal data structure in place. Raises ------ ValueError If a parsed line does not contain a tab delimiter. """ is_type(line, str) # find subsections and extract if (line.startswith("[") and line.endswith("]") and not (line.startswith("[[") and line.endswith("]]"))): # Start a new section current = line.strip("[]") setattr(self, ConfigNames.current, current) if current not in getattr(self, ConfigNames.data): # initiate dict key with a "" value getattr(self, ConfigNames.data)[current] = "" # add lines to a single string elif getattr(self, ConfigNames.current) is not None and line: current = getattr(self, ConfigNames.current) if getattr(self, ConfigNames.data)[current]: getattr(self, ConfigNames.data)[current] += sep getattr(self, ConfigNames.data)[current] += line # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def __call__(self, section: str|None = None, sep='\n') -> Self: """ Parse the configuration file into sections with concatenated content. Parameters ---------- section : `str` or `None`, default `None` The specific section to parse from the configuration file. If provided, only content within the matching section delimiters (``[[section]]``) will be processed. If None (default), all sections are parsed. sep : `str`, default `\n` The separator to indicate a new line. Returns ------- Self The parser instance with populated data. """ is_type(section, (type(None), str)) is_type(sep, str) # Clear existing data without replacing the dictionary getattr(self, ConfigNames.data).clear() # set defaults in_section = False setattr(self, ConfigNames.current, None) with open(getattr(self, ConfigNames.path), "r", encoding="utf-8") as file: for line in file: line = line.strip() if line.startswith("[[") and line.endswith("]]"): if section is not None: section_name = line.strip("[]").strip() if section_name == section: in_section = True # skip the [[ ]] line itself continue elif in_section: # stop the for loop if we find another [[ ]] break else: # just ignore continue elif re.match(r"^\s*#", line): # skip comments continue else: self._process_line(line=line, sep=sep) # set data getattr(type(self),ConfigNames.parsed_data).set_with_setter( self, getattr(self, ConfigNames.data)) # return return self
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def __str__(self): """String representation of the parsed data.""" result = [f"{self.__class__.__name__}"] if not getattr(self, ConfigNames.data): j = "" result.append(f"path={getattr(self,ConfigNames.path)}") else: j = "\n" # Determine the maximum key length for alignment print(getattr(self, ConfigNames.data)) print(getattr(self, ConfigNames.data).items()) # max_key_length = max( # (len(key) for _, attr in getattr(self, ConfigNames.data).items() # for key in attr.keys()), default=0) max_key_length = max( (len(key) for key in getattr(self, ConfigNames.data).keys()), default=0) for section, value in getattr(self, ConfigNames.data).items(): result.append(f"[{section}]") if isinstance(value, str): # Indent each line of the string value for line in value.split('\n'): result.append(f"\t{line}") # Handle list values elif isinstance(value, list): for item in value: result.append(f"\t{item}") # Handle dictionary values (for backwards compatibility) elif isinstance(value, dict): for key, val in value.items(): result.append( f"\t{key:<{max_key_length}} {val}") result.append("") return j.join(result)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def __repr__(self): """Developer-friendly representation of the parsed data.""" result = [f"{self.__class__.__name__}("] if not getattr(self, ConfigNames.data): j = "" result.append(f"path={getattr(self, ConfigNames.path)}") else: j = "\n" # Determine the maximum key length for alignment max_key_length = max( (len(key) for key in getattr(self, ConfigNames.data).keys()), default=0) for section, value in getattr(self, ConfigNames.data).items(): result.append(f" [{section}]") # Handle string values if isinstance(value, str): # Show truncated string for repr preview = (value[:50] + '...' if len(value) > 50 else value) result.append(f"\t{preview!r}") # Handle list values elif isinstance(value, list): result.append(f"\t{value!r}") # Handle dictionary values (for backwards compatibility) elif isinstance(value, dict): for key, val in value.items(): result.append( f"\t{key:<{max_key_length}} {val}") result.append(")") return j.join(result)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def check_environ(environ_variable:str=ConfigNames.config_dir, fall_back:str | Path | None = _CONFIG_DIR) -> str: """ Retrieve an environment variable pointing to a directory path, with optional fallback path. Attempts to retrieve the specified environment variable. If the variable is not set, the function will attempt to use the fallback path if provided. This is useful for configuration management where environment variables may not always be explicitly set. Parameters ---------- environ_variable : `str` The name of the environment variable to retrieve. fall_back : `str`, `Path` or `None` A fallback path to return if the environment variable is not set. If None, an error will be raised when the environment variable is missing. Returns ------- str A directory path. Raises ------ KeyError Raised when the environment variable is not set and fall_back is None. TypeError Raised when environ_variable is not of type str or fall_back is not of type str, Path, or None. Notes ----- The function will not check whether the path is available or whether permissions allow for read or write access Warnings -------- UserWarning Issued when the environment variable is not set and the fallback path is used instead. Examples -------- >>> check_environ("MY_VAR") '/path/to/default/config' """ # check input is_type(environ_variable, str) is_type(fall_back, (str, Path, type(None))) # check if environ_variable try: res = os.environ[environ_variable] except KeyError as e: if not fall_back is None: res = fall_back # print warning warnings.warn( 'The environmental variable `{}` is not set. Trying ' 'to recover using the default configuration files in ' '{}.'.format(environ_variable, fall_back), Warning) else: raise e # return return res
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ # Custom logging handler for in-place progress updates
[docs] class ProgressHandler(logging.StreamHandler): """ Custom handler that updates progress in place. Uses ANSI escape codes to overwrite previous output instead of printing new lines. Useful for progress updates during long-running operations. Parameters ---------- stream : `file-like object`, optional Output stream. Defaults to sys.stdout. Attributes ---------- _last_line_count : `int` Number of lines in the previous message, used to calculate how far to move the cursor up. Examples -------- >>> progress_logger = logging.getLogger('progress') >>> handler = ProgressHandler() >>> handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s')) >>> progress_logger.addHandler(handler) >>> progress_logger.info("Processing: 100 records") >>> progress_logger.info("Processing: 200 records") # Overwrites previous """ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def __init__(self, stream=None): if stream is None: stream = sys.stdout super().__init__(stream) self._last_line_count = 0 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def emit(self, record: logging.LogRecord) -> None: """ Emit a log record with in-place updating. Parameters ---------- record : `logging.LogRecord` The log record to emit """ try: msg = self.format(record) lines = msg.split('\n') # Move cursor up and clear previous lines if self._last_line_count > 0: self.stream.write(f"\033[{self._last_line_count}A") for _ in range(self._last_line_count): self.stream.write("\033[K\n") self.stream.write(f"\033[{self._last_line_count}A") # Write new message self.stream.write(msg + '\n') self.stream.flush() # Remember line count for next update self._last_line_count = len(lines) except Exception: self.handleError(record)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def reset(self) -> None: """ Reset line count. Call this when switching from in-place updates to normal logging to prevent cursor position issues. """ self._last_line_count = 0