"""
Configuration parsing and XML validation utilities for ClinVar Build.
This module provides utilities for parsing configuration files, and managing
logging output for long-running operations. It includes classes for handling
block-based configuration files and property management with controlled access.
"""
import re
import os
import sys
import warnings
import logging
from pathlib import Path
from typing import Self
from clinvar_build.constants import (
UtilsConfigData as ConfigNames,
_CONFIG_DIR,
)
from clinvar_build.errors import (
is_type,
)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
class ManagedProperty(object):
"""
A generic property factory defining setters and getters, with optional
type validation.
Parameters
----------
name : `str`
The name of the setters and getters
types: `Type`, default `NoneType`
Either a single type, or a tuple of types to test against.
Methods
-------
enable_setter()
Enables the setter for the property, allowing attribute assignment.
disable_setter()
Disables the setter for the property, making the property read-only.
set_with_setter(instance, value)
Enables the setter, sets the property value, and then disables
the setter, ensuring controlled updates.
Returns
-------
property
A property object with getter and setter.
"""
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def __init__(self, name: str, types: tuple[type] | type | None = None):
"""
Initialize the ManagedProperty.
"""
self.name = name
self.types = types
self._setter_enabled = True
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# NOTE owner is part of the descriptor protocol for __get__, leave it
[docs]
def __get__(self, instance, owner):
"""Getter for the property."""
if instance is None:
return self
return instance.__dict__.get(self.name)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def __set__(self, instance, value):
"""Setter for the property."""
owner = type(instance)
if not self._setter_enabled:
raise AttributeError(f"The property '{self.name}' on "
f"{owner.__name__} is read-only.")
if self.types and not isinstance(value, self.types):
raise ValueError(
f"Expected any of {self.types}, got {type(value)} "
f"for property '{self.name}'."
)
instance.__dict__[self.name] = value
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def enable_setter(self):
"""Enable the setter for the property."""
self._setter_enabled = True
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def disable_setter(self):
"""Disable the setter for the property."""
self._setter_enabled = False
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def set_with_setter(self, instance, value):
"""
Enable the setter, set the property value, and then disable the setter.
Parameters
----------
instance : `object`
The instance on which the property is being set.
value : `any`
The value to assign to the property.
"""
try:
self.enable_setter()
setattr(instance, self.name, value)
finally:
self.disable_setter()
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
[docs]
class BlockConfigParser(object):
"""
Parses configuration files with headers and concatenated content,
essentially parsing a block of text to a single string.
This parser identifies sections marked by headers wrapped in square
brackets and appends all subsequent lines (until the next header) into
a single string for each section.
Parameters
----------
path : `str` or `Path`
Path to the configuration file to be parsed.
Attributes
----------
parsed_data : `dict` [`str`, `str`]
The parsed data.
Examples
--------
Given a configuration file:
[header1]
This is line one
This is line two
[header2]
Another section
With multiple lines
The parser will create:
{
'header1': 'This is line one\\nThis is line two',
'header2': 'Another section\\nWith multiple lines'
}
"""
# properties
path = ManagedProperty(ConfigNames.path)
_data = ManagedProperty(ConfigNames.data, dict)
parsed_data = ManagedProperty(ConfigNames.parsed_data, dict)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def __init__(self, path:str):
'''
Initialize the ConfigParser instance.
'''
# making sure the setter is only used during _init_
getattr(type(self),ConfigNames.path).set_with_setter(self, path)
getattr(type(self),ConfigNames.data).set_with_setter(self, {})
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def __eq__(self, other):
"""
Determine how instances are compared.
"""
if len(getattr(self, ConfigNames.data)) == 0:
if self.path == other.path and self._data == other._data:
return True
elif self._data == other._data:
return True
return False
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _process_line(self, line:str, sep:str) -> None:
"""
Process configuration file lines.
This method handles subsection headers and key-value pairs within
a section. It updates the internal data structure with parsed
content.
Parameters
----------
line : `str`
The line to process.
Returns
-------
None
Modifies the internal data structure in place.
Raises
------
ValueError
If a parsed line does not contain a tab delimiter.
"""
is_type(line, str)
# find subsections and extract
if (line.startswith("[") and line.endswith("]") and
not (line.startswith("[[") and
line.endswith("]]"))):
# Start a new section
current = line.strip("[]")
setattr(self, ConfigNames.current, current)
if current not in getattr(self, ConfigNames.data):
# initiate dict key with a "" value
getattr(self, ConfigNames.data)[current] = ""
# add lines to a single string
elif getattr(self, ConfigNames.current) is not None and line:
current = getattr(self, ConfigNames.current)
if getattr(self, ConfigNames.data)[current]:
getattr(self, ConfigNames.data)[current] += sep
getattr(self, ConfigNames.data)[current] += line
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def __call__(self,
section: str|None = None, sep='\n') -> Self:
"""
Parse the configuration file into sections with concatenated content.
Parameters
----------
section : `str` or `None`, default `None`
The specific section to parse from the configuration file. If
provided, only content within the matching section delimiters
(``[[section]]``) will be processed. If None (default), all
sections are parsed.
sep : `str`, default `\n`
The separator to indicate a new line.
Returns
-------
Self
The parser instance with populated data.
"""
is_type(section, (type(None), str))
is_type(sep, str)
# Clear existing data without replacing the dictionary
getattr(self, ConfigNames.data).clear()
# set defaults
in_section = False
setattr(self, ConfigNames.current, None)
with open(getattr(self, ConfigNames.path), "r", encoding="utf-8") as file:
for line in file:
line = line.strip()
if line.startswith("[[") and line.endswith("]]"):
if section is not None:
section_name = line.strip("[]").strip()
if section_name == section:
in_section = True
# skip the [[ ]] line itself
continue
elif in_section:
# stop the for loop if we find another [[ ]]
break
else:
# just ignore
continue
elif re.match(r"^\s*#", line):
# skip comments
continue
else:
self._process_line(line=line, sep=sep)
# set data
getattr(type(self),ConfigNames.parsed_data).set_with_setter(
self, getattr(self, ConfigNames.data))
# return
return self
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def __str__(self):
"""String representation of the parsed data."""
result = [f"{self.__class__.__name__}"]
if not getattr(self, ConfigNames.data):
j = ""
result.append(f"path={getattr(self,ConfigNames.path)}")
else:
j = "\n"
# Determine the maximum key length for alignment
print(getattr(self, ConfigNames.data))
print(getattr(self, ConfigNames.data).items())
# max_key_length = max(
# (len(key) for _, attr in getattr(self, ConfigNames.data).items()
# for key in attr.keys()), default=0)
max_key_length = max(
(len(key) for key in
getattr(self, ConfigNames.data).keys()), default=0)
for section, value in getattr(self, ConfigNames.data).items():
result.append(f"[{section}]")
if isinstance(value, str):
# Indent each line of the string value
for line in value.split('\n'):
result.append(f"\t{line}")
# Handle list values
elif isinstance(value, list):
for item in value:
result.append(f"\t{item}")
# Handle dictionary values (for backwards compatibility)
elif isinstance(value, dict):
for key, val in value.items():
result.append(
f"\t{key:<{max_key_length}} {val}")
result.append("")
return j.join(result)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def __repr__(self):
"""Developer-friendly representation of the parsed data."""
result = [f"{self.__class__.__name__}("]
if not getattr(self, ConfigNames.data):
j = ""
result.append(f"path={getattr(self, ConfigNames.path)}")
else:
j = "\n"
# Determine the maximum key length for alignment
max_key_length = max(
(len(key) for key in
getattr(self, ConfigNames.data).keys()), default=0)
for section, value in getattr(self, ConfigNames.data).items():
result.append(f" [{section}]")
# Handle string values
if isinstance(value, str):
# Show truncated string for repr
preview = (value[:50] + '...'
if len(value) > 50 else value)
result.append(f"\t{preview!r}")
# Handle list values
elif isinstance(value, list):
result.append(f"\t{value!r}")
# Handle dictionary values (for backwards compatibility)
elif isinstance(value, dict):
for key, val in value.items():
result.append(
f"\t{key:<{max_key_length}} {val}")
result.append(")")
return j.join(result)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def check_environ(environ_variable:str=ConfigNames.config_dir,
fall_back:str | Path | None = _CONFIG_DIR) -> str:
"""
Retrieve an environment variable pointing to a directory path, with
optional fallback path.
Attempts to retrieve the specified environment variable. If the
variable is not set, the function will attempt to use the fallback
path if provided. This is useful for configuration management where
environment variables may not always be explicitly set.
Parameters
----------
environ_variable : `str`
The name of the environment variable to retrieve.
fall_back : `str`, `Path` or `None`
A fallback path to return if the environment variable is not set. If
None, an error will be raised when the environment variable is missing.
Returns
-------
str
A directory path.
Raises
------
KeyError
Raised when the environment variable is not set and
fall_back is None.
TypeError
Raised when environ_variable is not of type str or
fall_back is not of type str, Path, or None.
Notes
-----
The function will not check whether the path is available or whether
permissions allow for read or write access
Warnings
--------
UserWarning
Issued when the environment variable is not set and the
fallback path is used instead.
Examples
--------
>>> check_environ("MY_VAR")
'/path/to/default/config'
"""
# check input
is_type(environ_variable, str)
is_type(fall_back, (str, Path, type(None)))
# check if environ_variable
try:
res = os.environ[environ_variable]
except KeyError as e:
if not fall_back is None:
res = fall_back
# print warning
warnings.warn(
'The environmental variable `{}` is not set. Trying '
'to recover using the default configuration files in '
'{}.'.format(environ_variable, fall_back), Warning)
else:
raise e
# return
return res
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
# Custom logging handler for in-place progress updates
[docs]
class ProgressHandler(logging.StreamHandler):
"""
Custom handler that updates progress in place.
Uses ANSI escape codes to overwrite previous output instead of
printing new lines. Useful for progress updates during long-running
operations.
Parameters
----------
stream : `file-like object`, optional
Output stream. Defaults to sys.stdout.
Attributes
----------
_last_line_count : `int`
Number of lines in the previous message, used to calculate
how far to move the cursor up.
Examples
--------
>>> progress_logger = logging.getLogger('progress')
>>> handler = ProgressHandler()
>>> handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
>>> progress_logger.addHandler(handler)
>>> progress_logger.info("Processing: 100 records")
>>> progress_logger.info("Processing: 200 records") # Overwrites previous
"""
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def __init__(self, stream=None):
if stream is None:
stream = sys.stdout
super().__init__(stream)
self._last_line_count = 0
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def emit(self, record: logging.LogRecord) -> None:
"""
Emit a log record with in-place updating.
Parameters
----------
record : `logging.LogRecord`
The log record to emit
"""
try:
msg = self.format(record)
lines = msg.split('\n')
# Move cursor up and clear previous lines
if self._last_line_count > 0:
self.stream.write(f"\033[{self._last_line_count}A")
for _ in range(self._last_line_count):
self.stream.write("\033[K\n")
self.stream.write(f"\033[{self._last_line_count}A")
# Write new message
self.stream.write(msg + '\n')
self.stream.flush()
# Remember line count for next update
self._last_line_count = len(lines)
except Exception:
self.handleError(record)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def reset(self) -> None:
"""
Reset line count.
Call this when switching from in-place updates to normal logging
to prevent cursor position issues.
"""
self._last_line_count = 0