"""UniProt file parser for secondary-to-primary identifier mappings.
This parser extracts ID-to-ID mappings:
- Secondary accessions -> primary accessions (from sec_ac.txt)
- Deleted accessions -> sssom:NoTermFound (from delac_sp.txt)
Uses SSSOM-compliant IdMappingSet with cardinality computation.
"""
from __future__ import annotations
from pathlib import Path
import polars as pl
from sssom_schema import Mapping
from pysec2pri.parsers.base import (
WITHDRAWN_ENTRY,
WITHDRAWN_ENTRY_LABEL,
BaseDownloader,
BaseParser,
Sec2PriMappingSet,
)
[docs]
class UniProtParser(BaseParser):
"""Parser for UniProt files using Polars.
Extracts secondary-to-primary UniProt accession mappings from
sec_ac.txt (secondary accessions) and delac_sp.txt (deleted accessions).
Returns IdMappingSet for all mappings (UniProt only has ID mappings).
"""
datasource_name = "uniprot"
@property
def sec_ac_url(self) -> str:
"""Get the sec_ac.txt download URL from config."""
return self.get_download_url("sec_ac") or ""
@property
def delac_url(self) -> str:
"""Get the delac_sp.txt download URL from config."""
return self.get_download_url("delac_sp") or ""
[docs]
def parse(
self,
input_path: Path | str | None = None,
delac_path: Path | str | None = None,
) -> Sec2PriMappingSet:
"""Parse UniProt mapping files into an IdMappingSet.
Args:
input_path: Path to sec_ac.txt (secondary accessions file).
delac_path: Path to delac_sp.txt (deleted accessions file).
Returns:
IdMappingSet with computed cardinalities based on IDs.
"""
# Resolve version
self._resolve_version(
Path(input_path)
if input_path is not None
else (Path(delac_path) if delac_path is not None else None)
)
mappings: list[Mapping] = []
if input_path is not None:
mappings.extend(self._parse_sec_ac(Path(input_path)))
if delac_path is not None:
mappings.extend(self._parse_delac(Path(delac_path)))
return self._create_mapping_set(mappings)
def _parse_sec_ac(self, file_path: Path) -> list[Mapping]:
"""Parse sec_ac.txt for secondary -> primary accession mappings.
Args:
file_path: Path to sec_ac.txt file.
Returns:
List of SSSOM Mapping objects.
"""
# Count lines until (and including) the separator row.
skip_rows = 0
with file_path.open("r", encoding="utf-8") as f:
for raw_line in f:
skip_rows += 1
if raw_line.startswith("_"):
break # next line is first data row
df = (
pl.scan_csv(
file_path,
has_header=False,
skip_rows=skip_rows,
separator="\n",
new_columns=["line"],
infer_schema_length=0,
quote_char=None,
)
.filter(
pl.col("line").str.len_chars() > 0,
~pl.col("line").str.starts_with("-"),
~pl.col("line").str.starts_with("_"),
)
.with_columns(
pl.col("line")
.str.split_exact(" ", 1)
.struct.field("field_0")
.str.strip_chars()
.alias("subject_id"),
pl.col("line").str.split(" ").list.last().str.strip_chars().alias("object_id"),
)
.filter(
pl.col("subject_id").str.len_chars() > 0,
pl.col("object_id").str.len_chars() > 0,
pl.col("subject_id") != pl.col("object_id"),
)
.with_columns(
(pl.lit("UniProtKB:") + pl.col("subject_id")).alias("subject_id"),
(pl.lit("UniProtKB:") + pl.col("object_id")).alias("object_id"),
)
.select(["subject_id", "object_id"])
.collect()
)
if df.is_empty():
return []
m_meta = self.get_mapping_metadata()
fixed = {
"predicate_id": m_meta["predicate_id"],
"predicate_label": m_meta.get("predicate_label"),
"mapping_justification": m_meta["mapping_justification"],
"subject_source": m_meta.get("subject_source"),
"object_source": m_meta.get("object_source"),
"mapping_tool": m_meta.get("mapping_tool"),
"license": m_meta.get("license"),
}
rows = df.select(["subject_id", "object_id"]).to_dicts()
return self._build_mappings(rows, fixed, desc="Processing sec_ac", total=len(rows))
def _parse_delac(self, file_path: Path) -> list[Mapping]:
"""Parse delac_sp.txt for deleted accession mappings.
Deleted accessions map to sssom:NoTermFound (1:0 cardinality).
Args:
file_path: Path to delac_sp.txt file.
Returns:
List of SSSOM Mapping objects.
"""
skip_rows = 0
with file_path.open("r", encoding="utf-8") as f:
for raw_line in f:
skip_rows += 1
if raw_line.startswith("_"):
break # next line is first deleted accession
df = (
pl.scan_csv(
file_path,
has_header=False,
skip_rows=skip_rows,
separator="\t",
new_columns=["accession"],
infer_schema_length=0,
quote_char=None,
)
.with_columns(pl.col("accession").str.strip_chars())
.filter(
pl.col("accession").str.contains(
r"^[OPQ][0-9][A-Z0-9]{3}[0-9]$|^[A-NR-Z][0-9]([A-Z][A-Z0-9]{2}[0-9]){1,2}$"
)
)
.with_columns((pl.lit("UniProtKB:") + pl.col("accession")).alias("object_id"))
.select("object_id")
.collect()
)
if df.is_empty():
return []
m_meta = self.get_mapping_metadata()
fixed = {
"subject_id": WITHDRAWN_ENTRY,
"subject_label": WITHDRAWN_ENTRY_LABEL,
"predicate_id": "oboInOwl:consider",
"mapping_justification": m_meta["mapping_justification"],
"subject_source": m_meta.get("subject_source"),
"object_source": m_meta.get("object_source"),
"mapping_tool": m_meta.get("mapping_tool"),
"license": m_meta.get("license"),
"comment": "Deleted accession with no replacement.",
}
rows = df.select("object_id").to_dicts()
return self._build_mappings(rows, fixed, desc="Processing delac", total=len(rows))
def _create_mapping_set(
self, mappings: list[Mapping], mapping_type: str = "id"
) -> Sec2PriMappingSet:
"""Delegate to base class method."""
return self.create_mapping_set(mappings, mapping_type)
[docs]
def parse_primary_ids(
self,
acindex_path: Path | str | None = None,
) -> Sec2PriMappingSet:
"""Return a mapping set containing the full list of current UniProt primary ACs.
Parses ``acindex.txt`` (or a gzip-compressed variant) to extract every
accession number that currently appears in UniProtKB/Swiss-Prot. The
file lists one AC per row (after the ``__________`` separator line);
only the first whitespace-delimited token of each data line is taken.
For versioned (legacy) releases the file can be found at::
https://ftp.uniprot.org/pub/databases/uniprot/previous_releases/
release-{version}/knowledgebase/docs/acindex.txt.gz
Args:
acindex_path: Local path to ``acindex.txt`` (plain or ``.gz``).
Auto-downloaded from the current release when ``None``.
Returns:
:class:`~pysec2pri.parsers.base.IdMappingSet` with no mappings and
``_primary_ids`` populated with all current ``UniProtKB:<AC>`` CURIEs.
"""
if acindex_path is None:
from pysec2pri.api import _auto_download
acindex_path = _auto_download("uniprot", None, keys=["acindex"])["acindex"]
acindex_path = Path(str(acindex_path))
self._resolve_version(acindex_path)
primary_ids = self._extract_primary_ids_from_acindex(acindex_path)
ms = self._create_mapping_set([], mapping_type="id")
object.__setattr__(ms, "_primary_ids", primary_ids)
return ms
def _extract_primary_ids_from_acindex(self, file_path: Path) -> set[str]:
"""Parse ``acindex.txt`` and return the set of all AC numbers.
Skips the header block (everything up to and including the ``__________``
separator line) and extracts the first whitespace-delimited token of
each subsequent non-empty line.
Args:
file_path: Path to ``acindex.txt`` (plain or ``.gz``).
Returns:
Set of ``UniProtKB:<AC>`` CURIEs.
"""
import gzip
opener = gzip.open if file_path.suffix == ".gz" else open
primary_ids: set[str] = set()
in_data = False
with opener(file_path, "rt", encoding="utf-8", errors="replace") as fh:
for line in fh:
stripped = line.strip()
if not in_data:
if stripped.startswith("__"):
in_data = True
continue
if not stripped:
continue
token = stripped.split()[0]
if token:
primary_ids.add(f"UniProtKB:{token}")
return primary_ids
class UniProtDownloader(BaseDownloader):
"""Downloader for UniProt data files."""
datasource_name = "uniprot"
def get_download_urls(
self,
version: str | None = None,
**kwargs: object,
) -> dict[str, str]:
"""Get UniProt download URLs for *version*, or latest."""
from pysec2pri.download import _get_uniprot_urls_for_version
if version:
return _get_uniprot_urls_for_version(version)
if self._config:
return dict(self._config.download_urls)
raise ValueError("UniProt config not loaded")
def download(
self,
output_dir: Path,
version: str | None = None,
decompress: bool = True,
**kwargs: object,
) -> dict[str, Path]:
"""Download UniProt files into *output_dir*."""
urls = self.get_download_urls(version)
return self._download_urls(urls, output_dir, decompress)
def list_versions(self) -> list[str]:
"""List all available UniProt previous-release versions.
Scrapes the UniProt FTP previous_releases directory for version
strings.
Returns:
Sorted list of version strings
(e.g. ``["2024_01", "2024_02", ...]``).
Raises:
ValueError: If the archive URL is not configured.
"""
import re
import httpx
if not self._config or not self._config.archive_url:
raise ValueError("UniProt archive URL not configured")
with httpx.Client(follow_redirects=True, timeout=30.0) as client:
response = client.get(self._config.archive_url)
response.raise_for_status()
# FTP HTML index: links like "release-2024_01/"
matches = re.findall(r'href="release-(\d{4}_\d{2})/', response.text)
return sorted(set(matches))
__all__ = ["UniProtDownloader", "UniProtParser"]