"""
ClinVar XML to SQLite database parser
This module provides a comprehensive parser for converting ClinVar XML files
into SQLite databases. It implements a configuration-driven architecture that
supports both RCV (Reference ClinVar) and VCV (Variation
ClinVar) XML formats through JSON-based table and column specifications.
The parser is implemented using iterative parsing and batch commits, minimising
memory usage. Progress tracking is enabled through a SQL table, allowing for
restarts after the last commited record.
"""
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
# imports
import os
import sys
import json
import sqlite3
import logging
import argparse
import xml.etree.ElementTree as ET
import clinvar_build.utils.config_tools as cnf_tools
from datetime import datetime
from typing import (
Any,
Literal,
get_args,
)
from pathlib import Path
from clinvar_build.errors import (
is_type,
)
from clinvar_build.utils.general import (
_check_directory,
_check_directory_readable,
_check_directory_writable,
)
from clinvar_build.utils.parser_tools import (
configure_logging,
SQLiteParser,
open_xml_file,
)
from clinvar_build.constants import (
ParserNames as PRNAM,
)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# constants
CastType = Literal["int", "float", "bool", "str"]
# initiating a logger
logger = logging.getLogger(__name__)
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
[docs]
class ClinVarParser(SQLiteParser):
"""
Parser for ClinVar XML files.
Parameters
----------
config : dict[str, Any]
A dictionary with instructions to parser an XML file to a SQLite
database.
Attributes
----------
conn : `sqlite3.Connection` or `None`
Active SQLite connection, or None if not connected.
cursor : `sqlite3.Cursor` or `None`
Cursor for executing SQL statements, or None if not connected.
stats : `dict` [`str`, `int`]
Statistics on parsed records
Class Attributes
----------------
_SQL_COLS_PATTERN : re.Pattern
Compiled regex to extract column names from SQL INSERT statements
"""
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
_progress_logger = None
_progress_handler = None
_current_accession = None
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def parse_file(self, xml_path: str | Path, db_path: str | Path,
batch_size: int = 10,
validate_foreign_keys:bool = True,
enforce_foreign_keys: bool = False,
xsd_path: str | Path | None = None,
xsd_strict: bool = False,
resume: bool = True,
count_duplicates: bool = False
) -> None:
"""
Parse ClinVar XML file into SQLite database.
Parameters
----------
xml_path : `str` or `Path`
Path to ClinVar XML file
db_path : `str` or `Path`
Path to SQLite database file
batch_size : `int`, default 10
Number of top-level records (including children) to commit at once.
validate_foreign_keys : `bool`, default `True`
Whether to validate foreign key integrity after parsing
enforce_foreign_keys : `bool`, default `False`
Whether to enforce foreign keys during parsing (slower but catches
errors immediately). If False, foreign keys are only validated
after parsing completes.
xsd_path : `str`, `Path`, or `None`, default `None`
Optional path to XSD schema for XML validation
xsd_strict : `bool`, default `False`
If False, allows XML elements not defined in XSD
resume : `bool`, default True
If True, attempt to resume from last checkpoint. If no
checkpoint exists, starts fresh. If False, always starts
fresh (existing data may cause constraint violations)
count_duplicates : `bool`, default False
After building the database print potential duplicated rows per
table
"""
# #### check input
is_type(db_path, (type(None), str, Path))
is_type(xml_path, (type(None), str, Path))
is_type(xsd_path, (type(None), str, Path))
is_type(xsd_strict, bool)
is_type(batch_size, int)
is_type(resume, bool)
# start and set constants
logger.info(f"Starting to parse file: {xml_path}")
xml_filename = Path(xml_path).name
meta:dict[str, Any] = self.config.get(PRNAM.meta)
# #### open connection
with self._connection(db_path):
# #### check if parsing already completed
parsing_skipped = resume and self._is_parsing_complete(
xml_filename, meta.get(PRNAM.meta_table))
# #### perform parsing
if parsing_skipped == False:
self._perform_parsing(
xml_path=xml_path, xml_filename=xml_filename,
meta=meta, batch_size=batch_size,
xsd_path=xsd_path, xsd_strict=xsd_strict,
resume=resume,
enforce_foreign_keys=enforce_foreign_keys,
)
# #### count duplicates (before validation)
if count_duplicates:
self.count_duplicates()
# #### validation and stats (common to both paths)
self.cursor.execute("PRAGMA foreign_keys = ON")
if validate_foreign_keys:
try:
_ = self.validate_database()
except ValueError as e:
logger.error(
f"Foreign key validation failed:\n"
f" {e}\n"
f" Re-run with -vv to see detailed errors during "
f"parsing."
)
raise
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@staticmethod
def _cast_value(value: Any, cast_type: CastType | None ) -> Any:
"""
Cast value to specified type.
Parameters
----------
value : `any`
Value to cast
cast_type : {`int`, `float`, `bool`, `str`} or `None`
Target type for casting. Options:
- ``"int"``: Cast to integer
- ``"float"``: Cast to float
- ``"bool"``: Cast to boolean (compares lowercase to "true")
- ``"str"``: Cast to string
- ``None``: No casting, return value as-is
Returns
-------
int, float, bool, str, or None
"""
# simply return value
if value is None or cast_type is None:
return value
if cast_type not in get_args(CastType):
raise ValueError(
f"Invalid cast_type '{cast_type}'. "
f"Must be one of: {', '.join(get_args(CastType))}"
)
cast_funcs = {
'int': int,
'float': float,
'bool': lambda x: str(x).strip().lower() == "true",
'str': str,
}
# Actually casting
try:
return cast_funcs[cast_type](value)
except (TypeError, ValueError):
logger.debug(f"Failed to cast value `{value}` to {cast_type}.")
return value
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _extract_value(self, elem: ET.Element, spec: dict[str, Any]) -> Any:
"""
Extract value from XML element based on spec.
Parameters
----------
elem : `ET.Element`
Source XML element
spec : `dict` [`str`, `any`]
Extraction specification with keys like xml_attr, xml_path,
xml_text, xml_tag, cast
Returns
-------
Any
Extracted and optionally cast value
"""
val = None
if spec.get(PRNAM.xml_tag):
# Use element's tag name as value
val = elem.tag
elif PRNAM.xml_path in spec:
# Find child element first
found = elem.find(spec[PRNAM.xml_path])
if found is not None:
if PRNAM.xml_attr in spec:
val = found.get(spec[PRNAM.xml_attr])
elif spec.get(PRNAM.xml_text):
val = found.text
else:
val = found.text
elif PRNAM.xml_attr in spec:
# Get attribute from current element
val = elem.get(spec[PRNAM.xml_attr])
elif spec.get(PRNAM.xml_text):
# Get text content from current element
val = elem.text
else:
raise ValueError(
f"Invalid extraction spec for <{elem.tag}>: {spec}. "
f"Must specify xml_tag, xml_path, xml_attr, or xml_text."
)
# Apply type casting
return type(self)._cast_value(val, spec.get(PRNAM.cast))
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _parse_element(self, elem: ET.Element, table: str,
parent_id: int | None = None,
entity_type: str | None = None) -> int | None:
"""
Generic recursive element parser.
Extracts attributes from XML element, inserts into database,
then recursively parses child elements defined in config.
Parameters
----------
elem : `ET.Element`
XML element to parse
table : `str`
Config key / table name
parent_id : `int` or `None`, default `None`
ID of parent record used as foreign key. None for root elements
(e.g. VariationArchive)
entity_type : `str` or `None`, default `None`
Parent entity type for polymorphic tables (e.g. 'VariationArchive',
'SimpleAllele', 'ClinicalAssertion'). Required when parent_id
column is 'entity_id'
Returns
-------
int or None
Inserted row ID if config specifies returns_id, else None
"""
# Get the config for the current table
cnf = self.config.get(table)
if cnf is None:
logger.error(f"No config found for table: {table}")
raise ValueError(f"No config found for table: {table}")
# get trace
# NOTE updated using CLAUDE
text_preview = elem.text[:50] if elem.text else None
logger.trace(
f"Parsing table: {table}: tag={elem.tag}, attrib={elem.attrib}, "
f"text={text_preview!r}, config={cnf}."
)
# Build values dict
values = {}
# =====================================================================
# "parent_id": "variation_archive_id"
# =====================================================================
parent_id_col = cnf.get(PRNAM.prnt_id)
if parent_id_col is not None:
# Add entity_type if this is a polymorphic table
# =================================================================
# "parent_id": "entity_id" (polymorphic pattern)
# =================================================================
if parent_id_col == PRNAM.enty_id:
if entity_type is None:
raise ValueError(
f"Polymorphic table {table} requires entity_type but none "
"provided"
)
values[PRNAM.enty_type] = entity_type
values[PRNAM.enty_id] = parent_id
else:
values[parent_id_col] = parent_id
# Extract attributes from XML
# =====================================================================
# "attributes": {
# "variation_id": {"xml_attr": "VariationID", "cast": "int"},
# "record_status": {"xml_path": "./RecordStatus", "xml_text": true}
# }
# =====================================================================
for col, spec in cnf.get(PRNAM.attr, {}).items():
values[col] = self._extract_value(elem, spec)
# Insert into database
# NOTE this is not a commit and can be rolled back (i.e. only in mem)
# =====================================================================
# "table_name": "VariationArchive",
# "columns": ["release_id", "variation_id", ...],
# "ignore_duplicates": false
# =====================================================================
row_id = self._insert(table, cnf, values, elem)
# Recursively parse children
# =====================================================================
# "children": [
# {"xpath": "./Comment", "table": "Comment",
# "entity_type": "VariationArchive"},
# {"xpath": "./ClassifiedRecord/SimpleAllele", "table": "SimpleAllele"}
# ]
# =====================================================================
for child_spec in cnf.get(PRNAM.child, []):
if PRNAM.xpath not in child_spec or PRNAM.table not in child_spec:
raise ValueError(
f"Child spec for {table} missing 'xpath' or 'table': "
f"{child_spec}"
)
child_entity_type = child_spec.get(PRNAM.enty_type)
for child_elem in elem.findall(child_spec[PRNAM.xpath]):
# recursion
self._parse_element(
child_elem,
table = child_spec[PRNAM.table],
parent_id=row_id,
entity_type=child_entity_type,
)
# Return row id
return row_id
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _insert(self, table: str, cnf: dict[str, Any],
values: dict[str, Any],
elem:ET.Element) -> int | None:
"""
Insert values into database table.
Parameters
----------
table : `str`
Table name (for stats tracking)
cnf : `dict`
Table configuration containing table_name, columns, etc.
values : `dict`
Column name to value mapping
Returns
-------
int or None
lastrowid if returns_id is configured, else None
"""
# Get the sql insert statement
sql_dict = cnf[PRNAM.sql]
cols = sql_dict[PRNAM.cols]
cols_str = ", ".join(cols)
sql = (
f"{sql_dict['statement']} {sql_dict['table_name']} ({cols_str}) "
f"VALUES ({', '.join('?' * len(cols))})"
)
# extract the values in order
vals = tuple(values.get(c) for c in cols)
# the insert
try:
self.cursor.execute(sql, vals)
except sqlite3.IntegrityError as e:
error_msg = self._format_sqlite_error(
e=e,
elem=elem,
sql=sql,
keys=cols,
values=vals,
accession=self._current_accession,
)
logger.error(error_msg)
raise
# update the table counts
self.stats[table] = self.stats.get(table, 0) + 1
# return row id if needed
if cnf.get(PRNAM.rtrn_id):
return self.cursor.lastrowid
return None
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _is_parsing_complete(self, xml_filename: str,
progress_table: str | None) -> bool:
"""
Check if parsing has already been completed for this XML file.
Queries the progress tracking table to determine if a completed
parsing record exists for the specified XML file. This prevents
re-parsing files that have already been fully processed.
Parameters
----------
xml_filename : `str`
Name of XML file being parsed
progress_table : `str` or `None`
Name of progress tracking table from config metadata
Returns
-------
bool
True if a completed progress record exists, False otherwise
Notes
-----
A parsing record is considered complete if the `completed_at`
column in the progress table is not NULL. This timestamp is set
by `_complete_progress` after all XML records have been inserted
but before database validation begins.
"""
if not progress_table:
return False
# query the progress table
self.cursor.execute(
f"SELECT completed_at FROM {progress_table} "
f"WHERE xml_file = ? AND completed_at IS NOT NULL "
f"ORDER BY id DESC LIMIT 1",
(xml_filename,)
)
row = self.cursor.fetchone()
if row and row[0]:
# format timestamp for better readability
completed_at = row[0].split('.')[0].replace('T', ' ')
logger.info(
f"Found completed parsing record for {xml_filename}\n"
f" Completed at: {completed_at}\n"
f" Skipping to validation step"
)
return True
# not completed yet
return False
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _init_resume(self, xml_filename: str, meta: dict[str, Any],
resume: bool) -> tuple[str | None, bool, int | None]:
"""
Initialise resume state and progress tracking.
Parameters
----------
xml_filename : str
Name of XML file being parsed
meta : dict
Metadata from _get_parse_meta()
resume : bool
Whether to attempt resuming from checkpoint
Returns
-------
tuple
(last_accession, skipping, progress_id)
"""
progress_table = meta.get(PRNAM.meta_table)
last_accession = None
skipping = False
progress_id = None
# Check for existing progress to resume
if resume and progress_table:
self.cursor.execute(
f"SELECT last_accession FROM {progress_table} "
f"WHERE xml_file = ? AND completed_at IS NULL "
f"ORDER BY id DESC LIMIT 1",
(xml_filename,)
)
row = self.cursor.fetchone()
if row and row[0]:
last_accession = row[0]
skipping = True
logger.info(f"Resuming from accession: {last_accession}")
elif row and row[0] is None:
logger.debug(
"The progress table did not record the last accession code "
"and hence does not know which records to skip."
)
else:
logger.info("No previous progress found, starting fresh")
# Record start of parsing
if progress_table:
started_at = datetime.now().isoformat()
self.cursor.execute(
f"INSERT INTO {progress_table} "
f"(xml_file, started_at, updated_at) VALUES (?, ?, ?)",
(xml_filename, started_at, started_at)
)
progress_id = self.cursor.lastrowid
# write to disk
self.conn.commit()
return last_accession, skipping, progress_id
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _update_progress(self, meta: dict[str, Any], progress_id: int | None,
current_accession: str | None,
completed: bool = False,
) -> None:
"""
Update progress table with the current checkpoint, optionally marking
completion.
Parameters
----------
meta : `dict` [`str`, `any`]
Metadata from ``_get_parse_meta()``.
progress_id : `int` or `None`
Row ID in the progress table.
current_accession : `str` or `None`
Current or final accession being processed.
completed : `bool`, default `False`
If True, set ``completed_at``
"""
progress_table = meta.get(PRNAM.meta_table)
# checking there is a table and an id
if not (progress_table and progress_id):
return
# current time
now = datetime.now().isoformat()
# Common clause and params
set_clause = (
"last_accession = ?, "
"records_processed = ?, "
"updated_at = ?"
)
params: list[Any] = [
current_accession,
sum(self.stats.values()),
now,
]
# Add completed clause and param if requested
if completed:
set_clause += ", completed_at = ?"
params.append(now)
# Final SQL statement and execution
sql = (
f"UPDATE {progress_table} SET "
f"{set_clause} "
f"WHERE id = ?"
)
params.append(progress_id)
self.cursor.execute(sql, params)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _log_batch_progress(self, batch_count: int) -> None:
"""
Log batch commit progress.
Uses in-place terminal updates for normal verbosity, and full
logger output for debug mode (-vv).
Parameters
----------
batch_count : int
Number of records in current batch
"""
total_records = sum(self.stats.values())
non_zero_stats = {
k: v
for k, v in sorted(self.stats.items())
if v > 0
}
zero_stats = [
k
for k, v in sorted(self.stats.items())
if v == 0
]
# skip `_` tables
zero_stats = [k for k in zero_stats if not k.startswith('_')]
if logger.isEnabledFor(logging.DEBUG):
# #### Verbose mode (-vv): full logger output
logger.info(
f"Batch commit completed\n"
f" Batch size: {batch_count}\n"
f" Total rows inserted: {total_records:,}\n"
f" Active tables ({len(non_zero_stats)}):\n " +
"\n ".join(
f"{k:30s} {v:>12,}"
for k, v in non_zero_stats.items()
) +
f"\n Inactive tables ({len(zero_stats)}): " +
", ".join(zero_stats)
)
else:
# #### Normal mode: in-place terminal update
# Build output message
lines = [
"Batch commit completed",
f" Batch size: {batch_count}",
f" Total rows inserted: {total_records:,}",
f" Active tables ({len(non_zero_stats)}):",
]
for k, v in non_zero_stats.items():
lines.append(f" {k:30s} {v:>12,}")
lines.append(
f" Inactive tables ({len(zero_stats)}): " +
", ".join(zero_stats)
)
# Log using progress logger (in-place update)
type(self)._progress_logger.info('\n'.join(lines))
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _perform_parsing(self, xml_path: str | Path, xml_filename: str,
meta: dict[str, Any], batch_size: int,
xsd_path: str | Path | None,
xsd_strict: bool, resume: bool,
enforce_foreign_keys: bool = False,
) -> None:
"""
Perform the actual XML parsing and database insertion.
This method contains the core parsing logic.
Parameters
----------
xml_path : `str` or `Path`
Path to ClinVar XML file
xml_filename : `str`
Name of XML file (extracted from path)
meta : `dict` [`str`, `any`]
Metadata from config containing progress table info
batch_size : `int`
Number of records to commit at once
xsd_path : `str`, `Path`, or `None`
Optional path to XSD schema for XML validation
xsd_strict : `bool`
If False, allows XML elements not defined in XSD
resume : `bool`
Whether to attempt resuming from last checkpoint
"""
# disable foreign key enforcement during parsing for speed
# keys are still inserted, just not validated until the end
if not enforce_foreign_keys:
self.cursor.execute("PRAGMA foreign_keys = OFF")
# current_accession = None
record_element = meta.get(PRNAM.meta_record)
accession_attr = meta.get(PRNAM.meta_acc)
accession_path = meta.get('accession_path', None)
# Initialise resume state
last_accession, skipping, progress_id = self._init_resume(
xml_filename, meta, resume
)
skip_count = 0
batch_count = 0
# use iterparse for memory efficiency
with open_xml_file(xml_path, xsd_path=xsd_path, strict=xsd_strict,
) as file_handle:
# iterating over the records
context = ET.iterparse(file_handle, events=("start", "end"))
event, root = next(context)
# Get root element
if skipping == False and event == 'start':
self._parse_element(root, table=root.tag, parent_id=None)
root.clear()
# buffering the full record
for event, elem in context:
# run until you find the end of the record so everyting is
# buffered
# skipping if not record_element`and end event
if event != 'end' or elem.tag != record_element:
continue
# Get accession for progress tracking
if accession_path is None:
self._current_accession = elem.get(accession_attr)
else:
self._current_accession = elem.find(accession_path).get(accession_attr)
# Handle resume - skip until we reach last checkpoint
if skipping:
if self._current_accession == last_accession:
skipping = False
logger.info(
f"Reached checkpoint {self._current_accession}, "
f"resuming parsing (skipped {skip_count:,} records)"
)
else:
skip_count += 1
if skip_count % 1000 == 0:
print(f"\rSkipping to checkpoint: {skip_count:>12,} "
f"records", end='', flush=True)
elem.clear()
continue
# Parse the complete record
self._parse_element(elem, table=record_element, parent_id=None)
batch_count += 1
elem.clear()
# Batch commit
if batch_count >= batch_size:
self._update_progress(meta, progress_id,
self._current_accession)
# Now commit to disk
self.conn.commit()
# NOTE add method
# Print progress
self._log_batch_progress(batch_count)
batch_count = 0
# Final commit
self._update_progress(
meta, progress_id, self._current_accession, completed=True,
)
self.conn.commit()
logger.info("All records inserted.")
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
[docs]
def parse_arguments():
"""
Parse command-line arguments for database parsing.
Returns
-------
argparse.Namespace
Parsed command-line arguments containing:
- db : Database file path
- xml : XML file path
- xsd : XSD schema file path (optional)
- rcv : Whether to create RCV schema
- vcv : Whether to create VCV schema
- batch_size : Number of records to commit at once
- enforce_fk : Whether to enforce foreign keys during parsing
- no_validation : Whether to skip post-build validation
- count_duplicates : Whether to count duplicate rows
- verbose : Verbosity level (0-3)
"""
parser = argparse.ArgumentParser(
description='Parse ClinVar XML files and populate SQLite database'
)
parser.add_argument(
'--db',
help='SQLite database path'
)
parser.add_argument(
'--xml',
help='The path to the ClinVar XML file'
)
parser.add_argument(
'--xsd',
default=None,
help='The path to the ClinVar XSD file (Optional)'
)
parser.add_argument(
'--rcv',
action='store_true',
help='Create RCV schema'
)
parser.add_argument(
'--vcv',
action='store_true',
help='Create VCV schema'
)
parser.add_argument(
'--batch-size',
type=int,
default=10,
help=(
'Number of top level records to commit at once (default: 10). '
)
)
parser.add_argument(
'--enforce-fk',
action='store_true',
help='Enforce foreign keys during parsing (slower but safer)'
)
parser.add_argument(
'--no-validation',
action='store_false',
help='To skip post-build foreign key database validation'
)
parser.add_argument(
'--count-duplicates',
action='store_true',
help='Count duplicated rows per table'
)
parser.add_argument(
'-v', '--verbose',
action='count',
default=0,
help='Increase verbosity (-v for INFO, -vv for DEBUG, -vvv TRACE)'
)
# return
return parser.parse_args()
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
# NOTE add XSD argparse processing
[docs]
def main():
"""
Main entry point for ClinVar XML parser.
Parses command-line arguments, loads configuration, initializes parser,
and processes ClinVar XML file into SQLite database.
"""
args = parse_arguments()
# Configure logging based on verbosity
configure_logging(args.verbose)
logger.info("Starting ClinVar XML parser")
# read config file
cnfpath = cnf_tools.check_environ()
json_dir = os.path.join(cnfpath, 'parser')
if args.rcv:
json_file = os.path.join(json_dir, PRNAM.rcv)
else:
json_file = os.path.join(json_dir, PRNAM.vcv)
# check directories
[_check_directory(Path(d).parent) for d in (args.db, args.xml, json_file)]
_check_directory_writable(args.db)
_check_directory_readable(args.xml)
_check_directory_readable(json_file)
# read json
logger.info(f"Loading configuration from: {json_file}")
with open(json_file, 'r', encoding='utf-8') as f:
config = json.load(f)
logger.info(f"Loaded configuration for {len(config)} element types")
# The parsing step
logger.info("Initializing ClinVar parser")
parser = ClinVarParser(config)
parser.parse_file(
xml_path=args.xml,
db_path=args.db,
batch_size=args.batch_size,
validate_foreign_keys=args.no_validation,
enforce_foreign_keys=args.enforce_fk,
count_duplicates=args.count_duplicates,
)
# Print summary
print("\n" + "="*parser._LINE_LEN)
print("PARSING SUMMARY")
print("="*70)
print(f"XML File: {args.xml}")
print(f"Database: {args.db}")
print(f"Total Records: {sum(parser.stats.values())}")
print("\nRecords by Table:")
for table_name, count in sorted(parser.stats.items()):
if count > 0:
print(f" {table_name:30s} {count:>10,}")
print("="*parser._LINE_LEN)
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
if __name__ == "__main__":
try:
main()
sys.exit(0)
except KeyboardInterrupt:
print("\nInterrupted by user")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)