Source code for clinvar_build.parser

"""
ClinVar XML to SQLite database parser

This module provides a comprehensive parser for converting ClinVar XML files
into SQLite databases. It implements a configuration-driven architecture that
supports both RCV (Reference ClinVar) and VCV (Variation
ClinVar) XML formats through JSON-based table and column specifications.

The parser is implemented using iterative parsing and batch commits, minimising
memory usage. Progress tracking is enabled through a SQL table, allowing for
restarts after the last commited record.
"""

# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
# imports
import os
import sys
import json
import sqlite3
import logging
import argparse
import xml.etree.ElementTree as ET
import clinvar_build.utils.config_tools as cnf_tools
from datetime import datetime
from typing import (
    Any,
    Literal,
    get_args,
)
from pathlib import Path
from clinvar_build.errors import (
    is_type,
)
from clinvar_build.utils.general import (
    _check_directory,
    _check_directory_readable,
    _check_directory_writable,
)
from clinvar_build.utils.parser_tools import (
    configure_logging,
    SQLiteParser,
    open_xml_file,
)
from clinvar_build.constants import (
    ParserNames as PRNAM,
)

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# constants
CastType = Literal["int", "float", "bool", "str"]

# initiating a logger
logger = logging.getLogger(__name__)

# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
[docs] class ClinVarParser(SQLiteParser): """ Parser for ClinVar XML files. Parameters ---------- config : dict[str, Any] A dictionary with instructions to parser an XML file to a SQLite database. Attributes ---------- conn : `sqlite3.Connection` or `None` Active SQLite connection, or None if not connected. cursor : `sqlite3.Cursor` or `None` Cursor for executing SQL statements, or None if not connected. stats : `dict` [`str`, `int`] Statistics on parsed records Class Attributes ---------------- _SQL_COLS_PATTERN : re.Pattern Compiled regex to extract column names from SQL INSERT statements """ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ _progress_logger = None _progress_handler = None _current_accession = None # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def parse_file(self, xml_path: str | Path, db_path: str | Path, batch_size: int = 10, validate_foreign_keys:bool = True, enforce_foreign_keys: bool = False, xsd_path: str | Path | None = None, xsd_strict: bool = False, resume: bool = True, count_duplicates: bool = False ) -> None: """ Parse ClinVar XML file into SQLite database. Parameters ---------- xml_path : `str` or `Path` Path to ClinVar XML file db_path : `str` or `Path` Path to SQLite database file batch_size : `int`, default 10 Number of top-level records (including children) to commit at once. validate_foreign_keys : `bool`, default `True` Whether to validate foreign key integrity after parsing enforce_foreign_keys : `bool`, default `False` Whether to enforce foreign keys during parsing (slower but catches errors immediately). If False, foreign keys are only validated after parsing completes. xsd_path : `str`, `Path`, or `None`, default `None` Optional path to XSD schema for XML validation xsd_strict : `bool`, default `False` If False, allows XML elements not defined in XSD resume : `bool`, default True If True, attempt to resume from last checkpoint. If no checkpoint exists, starts fresh. If False, always starts fresh (existing data may cause constraint violations) count_duplicates : `bool`, default False After building the database print potential duplicated rows per table """ # #### check input is_type(db_path, (type(None), str, Path)) is_type(xml_path, (type(None), str, Path)) is_type(xsd_path, (type(None), str, Path)) is_type(xsd_strict, bool) is_type(batch_size, int) is_type(resume, bool) # start and set constants logger.info(f"Starting to parse file: {xml_path}") xml_filename = Path(xml_path).name meta:dict[str, Any] = self.config.get(PRNAM.meta) # #### open connection with self._connection(db_path): # #### check if parsing already completed parsing_skipped = resume and self._is_parsing_complete( xml_filename, meta.get(PRNAM.meta_table)) # #### perform parsing if parsing_skipped == False: self._perform_parsing( xml_path=xml_path, xml_filename=xml_filename, meta=meta, batch_size=batch_size, xsd_path=xsd_path, xsd_strict=xsd_strict, resume=resume, enforce_foreign_keys=enforce_foreign_keys, ) # #### count duplicates (before validation) if count_duplicates: self.count_duplicates() # #### validation and stats (common to both paths) self.cursor.execute("PRAGMA foreign_keys = ON") if validate_foreign_keys: try: _ = self.validate_database() except ValueError as e: logger.error( f"Foreign key validation failed:\n" f" {e}\n" f" Re-run with -vv to see detailed errors during " f"parsing." ) raise
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @staticmethod def _cast_value(value: Any, cast_type: CastType | None ) -> Any: """ Cast value to specified type. Parameters ---------- value : `any` Value to cast cast_type : {`int`, `float`, `bool`, `str`} or `None` Target type for casting. Options: - ``"int"``: Cast to integer - ``"float"``: Cast to float - ``"bool"``: Cast to boolean (compares lowercase to "true") - ``"str"``: Cast to string - ``None``: No casting, return value as-is Returns ------- int, float, bool, str, or None """ # simply return value if value is None or cast_type is None: return value if cast_type not in get_args(CastType): raise ValueError( f"Invalid cast_type '{cast_type}'. " f"Must be one of: {', '.join(get_args(CastType))}" ) cast_funcs = { 'int': int, 'float': float, 'bool': lambda x: str(x).strip().lower() == "true", 'str': str, } # Actually casting try: return cast_funcs[cast_type](value) except (TypeError, ValueError): logger.debug(f"Failed to cast value `{value}` to {cast_type}.") return value # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _extract_value(self, elem: ET.Element, spec: dict[str, Any]) -> Any: """ Extract value from XML element based on spec. Parameters ---------- elem : `ET.Element` Source XML element spec : `dict` [`str`, `any`] Extraction specification with keys like xml_attr, xml_path, xml_text, xml_tag, cast Returns ------- Any Extracted and optionally cast value """ val = None if spec.get(PRNAM.xml_tag): # Use element's tag name as value val = elem.tag elif PRNAM.xml_path in spec: # Find child element first found = elem.find(spec[PRNAM.xml_path]) if found is not None: if PRNAM.xml_attr in spec: val = found.get(spec[PRNAM.xml_attr]) elif spec.get(PRNAM.xml_text): val = found.text else: val = found.text elif PRNAM.xml_attr in spec: # Get attribute from current element val = elem.get(spec[PRNAM.xml_attr]) elif spec.get(PRNAM.xml_text): # Get text content from current element val = elem.text else: raise ValueError( f"Invalid extraction spec for <{elem.tag}>: {spec}. " f"Must specify xml_tag, xml_path, xml_attr, or xml_text." ) # Apply type casting return type(self)._cast_value(val, spec.get(PRNAM.cast)) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _parse_element(self, elem: ET.Element, table: str, parent_id: int | None = None, entity_type: str | None = None) -> int | None: """ Generic recursive element parser. Extracts attributes from XML element, inserts into database, then recursively parses child elements defined in config. Parameters ---------- elem : `ET.Element` XML element to parse table : `str` Config key / table name parent_id : `int` or `None`, default `None` ID of parent record used as foreign key. None for root elements (e.g. VariationArchive) entity_type : `str` or `None`, default `None` Parent entity type for polymorphic tables (e.g. 'VariationArchive', 'SimpleAllele', 'ClinicalAssertion'). Required when parent_id column is 'entity_id' Returns ------- int or None Inserted row ID if config specifies returns_id, else None """ # Get the config for the current table cnf = self.config.get(table) if cnf is None: logger.error(f"No config found for table: {table}") raise ValueError(f"No config found for table: {table}") # get trace # NOTE updated using CLAUDE text_preview = elem.text[:50] if elem.text else None logger.trace( f"Parsing table: {table}: tag={elem.tag}, attrib={elem.attrib}, " f"text={text_preview!r}, config={cnf}." ) # Build values dict values = {} # ===================================================================== # "parent_id": "variation_archive_id" # ===================================================================== parent_id_col = cnf.get(PRNAM.prnt_id) if parent_id_col is not None: # Add entity_type if this is a polymorphic table # ================================================================= # "parent_id": "entity_id" (polymorphic pattern) # ================================================================= if parent_id_col == PRNAM.enty_id: if entity_type is None: raise ValueError( f"Polymorphic table {table} requires entity_type but none " "provided" ) values[PRNAM.enty_type] = entity_type values[PRNAM.enty_id] = parent_id else: values[parent_id_col] = parent_id # Extract attributes from XML # ===================================================================== # "attributes": { # "variation_id": {"xml_attr": "VariationID", "cast": "int"}, # "record_status": {"xml_path": "./RecordStatus", "xml_text": true} # } # ===================================================================== for col, spec in cnf.get(PRNAM.attr, {}).items(): values[col] = self._extract_value(elem, spec) # Insert into database # NOTE this is not a commit and can be rolled back (i.e. only in mem) # ===================================================================== # "table_name": "VariationArchive", # "columns": ["release_id", "variation_id", ...], # "ignore_duplicates": false # ===================================================================== row_id = self._insert(table, cnf, values, elem) # Recursively parse children # ===================================================================== # "children": [ # {"xpath": "./Comment", "table": "Comment", # "entity_type": "VariationArchive"}, # {"xpath": "./ClassifiedRecord/SimpleAllele", "table": "SimpleAllele"} # ] # ===================================================================== for child_spec in cnf.get(PRNAM.child, []): if PRNAM.xpath not in child_spec or PRNAM.table not in child_spec: raise ValueError( f"Child spec for {table} missing 'xpath' or 'table': " f"{child_spec}" ) child_entity_type = child_spec.get(PRNAM.enty_type) for child_elem in elem.findall(child_spec[PRNAM.xpath]): # recursion self._parse_element( child_elem, table = child_spec[PRNAM.table], parent_id=row_id, entity_type=child_entity_type, ) # Return row id return row_id # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _insert(self, table: str, cnf: dict[str, Any], values: dict[str, Any], elem:ET.Element) -> int | None: """ Insert values into database table. Parameters ---------- table : `str` Table name (for stats tracking) cnf : `dict` Table configuration containing table_name, columns, etc. values : `dict` Column name to value mapping Returns ------- int or None lastrowid if returns_id is configured, else None """ # Get the sql insert statement sql_dict = cnf[PRNAM.sql] cols = sql_dict[PRNAM.cols] cols_str = ", ".join(cols) sql = ( f"{sql_dict['statement']} {sql_dict['table_name']} ({cols_str}) " f"VALUES ({', '.join('?' * len(cols))})" ) # extract the values in order vals = tuple(values.get(c) for c in cols) # the insert try: self.cursor.execute(sql, vals) except sqlite3.IntegrityError as e: error_msg = self._format_sqlite_error( e=e, elem=elem, sql=sql, keys=cols, values=vals, accession=self._current_accession, ) logger.error(error_msg) raise # update the table counts self.stats[table] = self.stats.get(table, 0) + 1 # return row id if needed if cnf.get(PRNAM.rtrn_id): return self.cursor.lastrowid return None # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _is_parsing_complete(self, xml_filename: str, progress_table: str | None) -> bool: """ Check if parsing has already been completed for this XML file. Queries the progress tracking table to determine if a completed parsing record exists for the specified XML file. This prevents re-parsing files that have already been fully processed. Parameters ---------- xml_filename : `str` Name of XML file being parsed progress_table : `str` or `None` Name of progress tracking table from config metadata Returns ------- bool True if a completed progress record exists, False otherwise Notes ----- A parsing record is considered complete if the `completed_at` column in the progress table is not NULL. This timestamp is set by `_complete_progress` after all XML records have been inserted but before database validation begins. """ if not progress_table: return False # query the progress table self.cursor.execute( f"SELECT completed_at FROM {progress_table} " f"WHERE xml_file = ? AND completed_at IS NOT NULL " f"ORDER BY id DESC LIMIT 1", (xml_filename,) ) row = self.cursor.fetchone() if row and row[0]: # format timestamp for better readability completed_at = row[0].split('.')[0].replace('T', ' ') logger.info( f"Found completed parsing record for {xml_filename}\n" f" Completed at: {completed_at}\n" f" Skipping to validation step" ) return True # not completed yet return False # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _init_resume(self, xml_filename: str, meta: dict[str, Any], resume: bool) -> tuple[str | None, bool, int | None]: """ Initialise resume state and progress tracking. Parameters ---------- xml_filename : str Name of XML file being parsed meta : dict Metadata from _get_parse_meta() resume : bool Whether to attempt resuming from checkpoint Returns ------- tuple (last_accession, skipping, progress_id) """ progress_table = meta.get(PRNAM.meta_table) last_accession = None skipping = False progress_id = None # Check for existing progress to resume if resume and progress_table: self.cursor.execute( f"SELECT last_accession FROM {progress_table} " f"WHERE xml_file = ? AND completed_at IS NULL " f"ORDER BY id DESC LIMIT 1", (xml_filename,) ) row = self.cursor.fetchone() if row and row[0]: last_accession = row[0] skipping = True logger.info(f"Resuming from accession: {last_accession}") elif row and row[0] is None: logger.debug( "The progress table did not record the last accession code " "and hence does not know which records to skip." ) else: logger.info("No previous progress found, starting fresh") # Record start of parsing if progress_table: started_at = datetime.now().isoformat() self.cursor.execute( f"INSERT INTO {progress_table} " f"(xml_file, started_at, updated_at) VALUES (?, ?, ?)", (xml_filename, started_at, started_at) ) progress_id = self.cursor.lastrowid # write to disk self.conn.commit() return last_accession, skipping, progress_id # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _update_progress(self, meta: dict[str, Any], progress_id: int | None, current_accession: str | None, completed: bool = False, ) -> None: """ Update progress table with the current checkpoint, optionally marking completion. Parameters ---------- meta : `dict` [`str`, `any`] Metadata from ``_get_parse_meta()``. progress_id : `int` or `None` Row ID in the progress table. current_accession : `str` or `None` Current or final accession being processed. completed : `bool`, default `False` If True, set ``completed_at`` """ progress_table = meta.get(PRNAM.meta_table) # checking there is a table and an id if not (progress_table and progress_id): return # current time now = datetime.now().isoformat() # Common clause and params set_clause = ( "last_accession = ?, " "records_processed = ?, " "updated_at = ?" ) params: list[Any] = [ current_accession, sum(self.stats.values()), now, ] # Add completed clause and param if requested if completed: set_clause += ", completed_at = ?" params.append(now) # Final SQL statement and execution sql = ( f"UPDATE {progress_table} SET " f"{set_clause} " f"WHERE id = ?" ) params.append(progress_id) self.cursor.execute(sql, params) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _log_batch_progress(self, batch_count: int) -> None: """ Log batch commit progress. Uses in-place terminal updates for normal verbosity, and full logger output for debug mode (-vv). Parameters ---------- batch_count : int Number of records in current batch """ total_records = sum(self.stats.values()) non_zero_stats = { k: v for k, v in sorted(self.stats.items()) if v > 0 } zero_stats = [ k for k, v in sorted(self.stats.items()) if v == 0 ] # skip `_` tables zero_stats = [k for k in zero_stats if not k.startswith('_')] if logger.isEnabledFor(logging.DEBUG): # #### Verbose mode (-vv): full logger output logger.info( f"Batch commit completed\n" f" Batch size: {batch_count}\n" f" Total rows inserted: {total_records:,}\n" f" Active tables ({len(non_zero_stats)}):\n " + "\n ".join( f"{k:30s} {v:>12,}" for k, v in non_zero_stats.items() ) + f"\n Inactive tables ({len(zero_stats)}): " + ", ".join(zero_stats) ) else: # #### Normal mode: in-place terminal update # Build output message lines = [ "Batch commit completed", f" Batch size: {batch_count}", f" Total rows inserted: {total_records:,}", f" Active tables ({len(non_zero_stats)}):", ] for k, v in non_zero_stats.items(): lines.append(f" {k:30s} {v:>12,}") lines.append( f" Inactive tables ({len(zero_stats)}): " + ", ".join(zero_stats) ) # Log using progress logger (in-place update) type(self)._progress_logger.info('\n'.join(lines)) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _perform_parsing(self, xml_path: str | Path, xml_filename: str, meta: dict[str, Any], batch_size: int, xsd_path: str | Path | None, xsd_strict: bool, resume: bool, enforce_foreign_keys: bool = False, ) -> None: """ Perform the actual XML parsing and database insertion. This method contains the core parsing logic. Parameters ---------- xml_path : `str` or `Path` Path to ClinVar XML file xml_filename : `str` Name of XML file (extracted from path) meta : `dict` [`str`, `any`] Metadata from config containing progress table info batch_size : `int` Number of records to commit at once xsd_path : `str`, `Path`, or `None` Optional path to XSD schema for XML validation xsd_strict : `bool` If False, allows XML elements not defined in XSD resume : `bool` Whether to attempt resuming from last checkpoint """ # disable foreign key enforcement during parsing for speed # keys are still inserted, just not validated until the end if not enforce_foreign_keys: self.cursor.execute("PRAGMA foreign_keys = OFF") # current_accession = None record_element = meta.get(PRNAM.meta_record) accession_attr = meta.get(PRNAM.meta_acc) accession_path = meta.get('accession_path', None) # Initialise resume state last_accession, skipping, progress_id = self._init_resume( xml_filename, meta, resume ) skip_count = 0 batch_count = 0 # use iterparse for memory efficiency with open_xml_file(xml_path, xsd_path=xsd_path, strict=xsd_strict, ) as file_handle: # iterating over the records context = ET.iterparse(file_handle, events=("start", "end")) event, root = next(context) # Get root element if skipping == False and event == 'start': self._parse_element(root, table=root.tag, parent_id=None) root.clear() # buffering the full record for event, elem in context: # run until you find the end of the record so everyting is # buffered # skipping if not record_element`and end event if event != 'end' or elem.tag != record_element: continue # Get accession for progress tracking if accession_path is None: self._current_accession = elem.get(accession_attr) else: self._current_accession = elem.find(accession_path).get(accession_attr) # Handle resume - skip until we reach last checkpoint if skipping: if self._current_accession == last_accession: skipping = False logger.info( f"Reached checkpoint {self._current_accession}, " f"resuming parsing (skipped {skip_count:,} records)" ) else: skip_count += 1 if skip_count % 1000 == 0: print(f"\rSkipping to checkpoint: {skip_count:>12,} " f"records", end='', flush=True) elem.clear() continue # Parse the complete record self._parse_element(elem, table=record_element, parent_id=None) batch_count += 1 elem.clear() # Batch commit if batch_count >= batch_size: self._update_progress(meta, progress_id, self._current_accession) # Now commit to disk self.conn.commit() # NOTE add method # Print progress self._log_batch_progress(batch_count) batch_count = 0 # Final commit self._update_progress( meta, progress_id, self._current_accession, completed=True, ) self.conn.commit() logger.info("All records inserted.")
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
[docs] def parse_arguments(): """ Parse command-line arguments for database parsing. Returns ------- argparse.Namespace Parsed command-line arguments containing: - db : Database file path - xml : XML file path - xsd : XSD schema file path (optional) - rcv : Whether to create RCV schema - vcv : Whether to create VCV schema - batch_size : Number of records to commit at once - enforce_fk : Whether to enforce foreign keys during parsing - no_validation : Whether to skip post-build validation - count_duplicates : Whether to count duplicate rows - verbose : Verbosity level (0-3) """ parser = argparse.ArgumentParser( description='Parse ClinVar XML files and populate SQLite database' ) parser.add_argument( '--db', help='SQLite database path' ) parser.add_argument( '--xml', help='The path to the ClinVar XML file' ) parser.add_argument( '--xsd', default=None, help='The path to the ClinVar XSD file (Optional)' ) parser.add_argument( '--rcv', action='store_true', help='Create RCV schema' ) parser.add_argument( '--vcv', action='store_true', help='Create VCV schema' ) parser.add_argument( '--batch-size', type=int, default=10, help=( 'Number of top level records to commit at once (default: 10). ' ) ) parser.add_argument( '--enforce-fk', action='store_true', help='Enforce foreign keys during parsing (slower but safer)' ) parser.add_argument( '--no-validation', action='store_false', help='To skip post-build foreign key database validation' ) parser.add_argument( '--count-duplicates', action='store_true', help='Count duplicated rows per table' ) parser.add_argument( '-v', '--verbose', action='count', default=0, help='Increase verbosity (-v for INFO, -vv for DEBUG, -vvv TRACE)' ) # return return parser.parse_args()
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ # NOTE add XSD argparse processing
[docs] def main(): """ Main entry point for ClinVar XML parser. Parses command-line arguments, loads configuration, initializes parser, and processes ClinVar XML file into SQLite database. """ args = parse_arguments() # Configure logging based on verbosity configure_logging(args.verbose) logger.info("Starting ClinVar XML parser") # read config file cnfpath = cnf_tools.check_environ() json_dir = os.path.join(cnfpath, 'parser') if args.rcv: json_file = os.path.join(json_dir, PRNAM.rcv) else: json_file = os.path.join(json_dir, PRNAM.vcv) # check directories [_check_directory(Path(d).parent) for d in (args.db, args.xml, json_file)] _check_directory_writable(args.db) _check_directory_readable(args.xml) _check_directory_readable(json_file) # read json logger.info(f"Loading configuration from: {json_file}") with open(json_file, 'r', encoding='utf-8') as f: config = json.load(f) logger.info(f"Loaded configuration for {len(config)} element types") # The parsing step logger.info("Initializing ClinVar parser") parser = ClinVarParser(config) parser.parse_file( xml_path=args.xml, db_path=args.db, batch_size=args.batch_size, validate_foreign_keys=args.no_validation, enforce_foreign_keys=args.enforce_fk, count_duplicates=args.count_duplicates, ) # Print summary print("\n" + "="*parser._LINE_LEN) print("PARSING SUMMARY") print("="*70) print(f"XML File: {args.xml}") print(f"Database: {args.db}") print(f"Total Records: {sum(parser.stats.values())}") print("\nRecords by Table:") for table_name, count in sorted(parser.stats.items()): if count > 0: print(f" {table_name:30s} {count:>10,}") print("="*parser._LINE_LEN)
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ if __name__ == "__main__": try: main() sys.exit(0) except KeyboardInterrupt: print("\nInterrupted by user") sys.exit(1) except Exception as e: print(f"Error: {e}") sys.exit(1)