"""HGNC TSV file parser for secondary-to-primary identifier mappings.
This parser extracts:
1. ID-to-ID mappings: withdrawn/merged HGNC IDs -> current HGNC IDs
2. Label-to-label mappings: previous/alias symbols -> current symbols
Uses SSSOM-compliant MappingSet classes with cardinality computation.
"""
from __future__ import annotations
from pathlib import Path
import polars as pl
from sssom_schema import Mapping
from pysec2pri.parsers.base import (
WITHDRAWN_ENTRY,
WITHDRAWN_ENTRY_LABEL,
BaseParser,
Sec2PriMappingSet,
)
# HGNC column names (case-insensitive matching used)
HGNC_ID = "hgnc_id"
SYMBOL = "symbol"
ALIAS_SYMBOL = "alias_symbol"
PREV_SYMBOL = "prev_symbol"
STATUS = "status"
# Merged info column has different naming variants across HGNC file versions
MERGED_INFO_PATTERNS = [
"merged_into_report(i.e. hgnc_id/symbol/status)",
"merged_into_report(i.e hgnc_id/symbol/status)",
"merged_into_report(s) (i.e hgnc_id|symbol|status)",
]
[docs]
class HGNCParser(BaseParser):
"""Parser for HGNC TSV files using Polars for memory efficiency.
Extracts secondary-to-primary HGNC identifier mappings and
symbol mappings from HGNC withdrawn and complete set files.
Returns:
- IdMappingSet for ID-to-ID mappings (withdrawn/merged IDs)
- LabelMappingSet for symbol mappings (alias/previous symbols)
"""
datasource_name = "hgnc"
def __init__(
self,
version: str | None = None,
show_progress: bool = True,
):
"""Initialize the HGNC parser.
Args:
version: Version/release identifier for the datasource.
show_progress: Whether to show progress bars during parsing.
"""
super().__init__(version=version, show_progress=show_progress)
@property
def withdrawn_source_url(self) -> str:
"""Get the withdrawn file download URL from config."""
return self.get_download_url("withdrawn") or ""
@property
def complete_set_source_url(self) -> str:
"""Get the complete set download URL from config."""
return self.get_download_url("complete") or ""
[docs]
def parse(
self,
input_path: Path | str | None,
complete_set_path: Path | str | None = None,
) -> Sec2PriMappingSet:
"""Parse HGNC withdrawn TSV file into an IdMappingSet.
Args:
input_path: Path to the withdrawn HGNC TSV file.
complete_set_path: Optional path to the HGNC complete set TSV.
When supplied, ``all_primary_ids`` on the returned mapping set
is populated with every current HGNC ID, not just those that
appear as ``object_id`` in a withdrawn to primary mapping.
Returns:
IdMappingSet with computed cardinalities based on IDs.
"""
if input_path is None:
raise ValueError("input_path must not be None")
input_path = Path(input_path)
self._resolve_version(input_path)
# Parse withdrawn file for ID mappings
mappings = self._parse_withdrawn(input_path)
# Create IdMappingSet and compute cardinalities
mapping_set = self._create_mapping_set(mappings, mapping_type="id")
# Populate the full primary ID set when the complete set is available
if complete_set_path is not None:
object.__setattr__(
mapping_set,
"_primary_ids",
self._extract_primary_ids(Path(complete_set_path)),
)
return mapping_set
[docs]
def parse_primary_ids(
self,
complete_set_path: Path | str | None,
) -> Sec2PriMappingSet:
"""Return a mapping set whose only content is the full primary ID list.
Reads the HGNC complete set to extract every current HGNC ID and
stores it in ``_primary_ids``. The ``mappings`` list is intentionally
left empty, this mapping set exists only to drive ``to_pri_ids()``.
Args:
complete_set_path: Path to the HGNC complete set TSV file.
Returns:
:class:`~pysec2pri.parsers.base.IdMappingSet` with no mappings and
``_primary_ids`` populated with all current HGNC IDs.
"""
if complete_set_path is None:
raise ValueError("complete_set_path must not be None")
complete_set_path = Path(complete_set_path)
self._resolve_version(complete_set_path)
mapping_set = self._create_mapping_set([], mapping_type="id")
object.__setattr__(
mapping_set,
"_primary_ids",
self._extract_primary_ids(complete_set_path),
)
return mapping_set
[docs]
def parse_symbols(
self,
complete_set_path: Path | str | None,
statuses: list[str] | None = None,
) -> Sec2PriMappingSet:
"""Parse HGNC complete set for symbol (label) mappings.
Args:
complete_set_path: Path to the complete HGNC set TSV file.
statuses: Entry statuses to include (e.g. ``["Approved"]``).
If ``None`` (default), all entries are included.
Returns:
LabelMappingSet with computed cardinalities based on labels.
"""
if complete_set_path is None:
raise ValueError("complete_set_path must not be None")
complete_set_path = Path(complete_set_path)
self._resolve_version(complete_set_path)
# Parse complete set for symbol mappings
mappings = self._parse_complete_set(complete_set_path, statuses=statuses)
# Create LabelMappingSet and compute cardinalities
mapping_set = self._create_mapping_set(mappings, mapping_type="label")
return mapping_set
[docs]
def parse_all(
self,
withdrawn_path: Path | str | None,
complete_set_path: Path | str | None,
) -> tuple[Sec2PriMappingSet, Sec2PriMappingSet]:
"""Parse both withdrawn and complete set files.
Args:
withdrawn_path: Path to the withdrawn HGNC TSV file.
complete_set_path: Path to the complete HGNC set TSV file.
Returns:
Tuple of (IdMappingSet, LabelMappingSet).
"""
id_mappings = self.parse(withdrawn_path)
label_mappings = self.parse_symbols(complete_set_path)
return id_mappings, label_mappings
def _extract_primary_ids(self, file_path: Path) -> set[str]:
"""Extract all current HGNC IDs from the complete set file.
Args:
file_path: Path to the HGNC complete set TSV file.
Returns:
Set of all HGNC IDs present in the complete set.
"""
df = pl.read_csv(
file_path,
separator="\t",
infer_schema_length=10000,
null_values=[""],
)
hgnc_id_col = self._find_column(df.columns, HGNC_ID)
if hgnc_id_col is None:
raise ValueError(f"Could not find hgnc_id column in {file_path}")
return {str(val) for val in df[hgnc_id_col].drop_nulls().to_list()}
def _parse_withdrawn(self, file_path: Path) -> list[Mapping]:
"""Parse withdrawn HGNC file for ID-to-ID mappings.
Args:
file_path: Path to the withdrawn HGNC TSV file.
Returns:
List of SSSOM Mapping objects.
"""
df = pl.read_csv(
file_path,
separator="\t",
infer_schema_length=10000,
null_values=[""],
)
merged_col = self._find_merged_column(df.columns, MERGED_INFO_PATTERNS)
if merged_col is None:
raise ValueError(f"Could not find merged_into_report column in {file_path}")
hgnc_id_col = self._find_column(df.columns, HGNC_ID)
if hgnc_id_col is None:
raise ValueError(f"Could not find hgnc_id column in {file_path}")
status_col = self._find_column(df.columns, STATUS)
symbol_col = self._find_column(df.columns, SYMBOL)
m_meta = self.get_mapping_metadata()
fixed = {
"mapping_justification": m_meta["mapping_justification"],
"subject_source": m_meta.get("subject_source"),
"object_source": m_meta.get("object_source"),
"mapping_tool": m_meta.get("mapping_tool"),
"license": m_meta.get("license"),
}
rows_data: list[dict[str, str | None]] = []
for row in df.iter_rows(named=True):
hgnc_id = row.get(hgnc_id_col)
if not hgnc_id:
continue
merged_info = row.get(merged_col)
status = row.get(status_col) if status_col else None
symbol = row.get(symbol_col) if symbol_col else None
# Case 1: Withdrawn with no replacement
if not merged_info and status and "Entry Withdrawn" in str(status):
rows_data.append(
{
"subject_id": hgnc_id,
"object_id": WITHDRAWN_ENTRY,
"subject_label": symbol or "",
"object_label": WITHDRAWN_ENTRY_LABEL,
"predicate_id": "oboInOwl:consider",
"comment": "Withdrawn entry with no replacement.",
}
)
continue
# Case 2: Merged into another entry
if merged_info:
parsed = self._parse_merged_info(merged_info)
if parsed:
target_id, target_symbol = parsed
rows_data.append(
{
"subject_id": hgnc_id,
"object_id": target_id,
"subject_label": symbol or "",
"object_label": target_symbol or "",
"predicate_id": m_meta["predicate_id"],
"predicate_label": m_meta.get("predicate_label"),
}
)
return self._build_mappings(
rows_data, fixed, desc="Processing withdrawn", total=len(rows_data)
)
def _parse_complete_set(
self, file_path: Path, statuses: list[str] | None = None
) -> list[Mapping]:
"""Parse complete HGNC set for symbol (label) mappings.
Args:
file_path: Path to the complete HGNC set TSV file.
statuses: Entry statuses to include (e.g. ``["Approved"]``).
If ``None`` (default), all entries are included.
Returns:
List of SSSOM Mapping objects for symbol mappings.
"""
df = pl.read_csv(
file_path,
separator="\t",
infer_schema_length=10000,
null_values=[""],
)
status_col = self._find_column(df.columns, STATUS)
hgnc_id_col = self._find_column(df.columns, HGNC_ID)
symbol_col = self._find_column(df.columns, SYMBOL)
alias_col = self._find_column(df.columns, ALIAS_SYMBOL)
prev_col = self._find_column(df.columns, PREV_SYMBOL)
if not all([status_col, hgnc_id_col, symbol_col]):
raise ValueError(f"Missing required columns in {file_path}")
assert hgnc_id_col is not None
assert symbol_col is not None
# Optionally filter by status
if statuses is not None and status_col:
df_approved = df.filter(pl.col(status_col).is_in(statuses))
else:
df_approved = df
m_meta = self.get_mapping_metadata()
fixed = {
"predicate_id": "oboInOwl:hasRelatedSynonym",
"mapping_justification": m_meta["mapping_justification"],
"subject_source": m_meta.get("subject_source"),
"object_source": m_meta.get("object_source"),
"mapping_tool": m_meta.get("mapping_tool"),
"license": m_meta.get("license"),
}
rows_data: list[dict[str, str | None]] = []
for row in df_approved.iter_rows(named=True):
hgnc_id = row.get(hgnc_id_col)
symbol = row.get(symbol_col)
if not hgnc_id or not symbol:
continue
alias_str = row.get(alias_col) if alias_col else None
prev_str = row.get(prev_col) if prev_col else None
aliases = self._split_symbols(alias_str) if alias_str else []
prev_symbols = self._split_symbols(prev_str) if prev_str else []
for alias in aliases:
rows_data.append(
{
"subject_id": hgnc_id,
"subject_label": alias,
"object_id": hgnc_id,
"object_label": symbol,
"comment": "Alias symbol mapping.",
}
)
for prev in prev_symbols:
rows_data.append(
{
"subject_id": hgnc_id,
"subject_label": prev,
"object_id": hgnc_id,
"object_label": symbol,
"comment": "Previous symbol mapping.",
}
)
return self._build_mappings(
rows_data, fixed, desc="Processing symbols", total=len(rows_data)
)
def _create_mapping_set(
self, mappings: list[Mapping], mapping_type: str = "id"
) -> Sec2PriMappingSet:
"""Create an IdMappingSet or LabelMappingSet with config metadata.
Delegates to BaseParser.create_mapping_set().
"""
return self.create_mapping_set(mappings, mapping_type)
__all__ = ["HGNCParser"]