Source code for textnets.corpus

"""Implements the features relating to language."""

from __future__ import annotations

import os
import sqlite3
from os import cpu_count
from pathlib import Path
from typing import TYPE_CHECKING, Any, Self

import numpy as np
import pandas as pd
import spacy
from toolz import compose, identity, memoize, partial
from tqdm.auto import tqdm
from tqdm.contrib.concurrent import thread_map
from wasabi import msg

import textnets as tn

from ._util import LiteFrame, df_split

if TYPE_CHECKING:
    from collections.abc import Callable, Iterator, Sequence

    from spacy.tokens import Token
    from spacy.tokens.doc import Doc

    #: Custom type for objects resembling documents (token sequences).
    DocLike = Doc | Sequence[Token]

#: Mapping of language codes to spaCy language model names.
LANGS = {
    "ca": "ca_core_news_sm",  # Catalan
    "da": "da_core_news_sm",  # Danish
    "de": "de_core_news_sm",  # German
    "el": "el_core_news_sm",  # Greek
    "en": "en_core_web_sm",  # English
    "es": "es_core_news_sm",  # Spanish
    "fi": "fi_core_news_sm",  # Finnish
    "fr": "fr_core_news_sm",  # French
    "hr": "hr_core_news_sm",  # Croatian
    "it": "it_core_news_sm",  # Italian
    "ja": "ja_core_news_sm",  # Japanese
    "ko": "ko_core_news_sm",  # Korean
    "lt": "lt_core_news_sm",  # Lithuanian
    "mk": "mk_core_news_sm",  # Macedonian
    "nb": "nb_core_news_sm",  # Norwegian
    "nl": "nl_core_news_sm",  # Dutch
    "pl": "pl_core_news_sm",  # Polish
    "pt": "pt_core_news_sm",  # Portuguese
    "ro": "ro_core_news_sm",  # Romanian
    "ru": "ru_core_news_sm",  # Russian
    "sl": "sl_core_news_sm",  # Slovenian
    "sv": "sv_core_news_sm",  # Swedish
    "uk": "uk_core_news_sm",  # Ukrainian
    "zh": "zh_core_web_sm",  # Chinese
}

_INSTALLED_MODELS = spacy.util.get_installed_models()


[docs] class Corpus: """ Corpus of labeled documents. Parameters ---------- data : Series Series containing the documents. The index must contain document labels. lang : str, optional The langugage model to use (default set by "lang" parameter). Attributes ---------- documents : Series The corpus documents. lang : str The language model used (ISO code or spaCy model name). Raises ------ ValueError If the supplied data is empty. """ def __init__( self, data: pd.Series, lang: str | None = None, ) -> None: if data.empty: raise ValueError("Corpus data is empty.") documents: pd.Series = data.copy() if missings := documents.isna().sum(): msg.warn(f"Dropping {missings} empty document(s).") documents = documents[~documents.isna()] if duplicated := documents.index.duplicated().sum(): msg.info( f"There are {duplicated} duplicate labels. Concatenating documents." ) documents = documents.groupby(level=0).agg("\n\n".join) documents.index = documents.index.set_names(["label"]) self.documents = documents if lang is None: lang = tn.params["lang"] self.lang = LANGS.get(lang, lang) if self.lang not in _INSTALLED_MODELS: msg.info(f"Language model '{self.lang}' is not yet installed.") @property @memoize def nlp(self) -> pd.Series: """Corpus documents with NLP applied.""" norm_docs: pd.Series = self.documents.map(_normalize_whitespace) max_length = max(map(len, norm_docs)) if max_length > 1_000_000: msg.info("Corpus contains very long documents. Memory usage will be high.") self._nlp_pipeline.max_length = max_length tqdm_args = dict(disable=not tn.params["progress_bar"] or None, unit="docs") cores = cpu_count() or 1 if cores > 1 and len(self.documents) >= cores: nlp_ufunc = np.frompyfunc(self._nlp_pipeline, 1, 1) doc_chunks = df_split(norm_docs, cores) return pd.concat(thread_map(nlp_ufunc, doc_chunks, **tqdm_args)) tqdm.pandas(**tqdm_args) return norm_docs.progress_map(self._nlp_pipeline) @property @memoize def _nlp_pipeline(self) -> spacy.Language: model_opts: dict[str, dict | list] = {"exclude": ["ner", "textcat"]} if self.lang.startswith("zh"): model_opts["config"] = {"nlp": {"tokenizer": {"segmenter": "jieba"}}} try: return spacy.load(self.lang, **model_opts) # type:ignore except OSError as err: if tn.params["autodownload"]: try: spacy.cli.download(self.lang) # type: ignore _INSTALLED_MODELS.append(self.lang) return spacy.load(self.lang, **model_opts) # type:ignore except (KeyError, OSError): pass elif self.lang in LANGS.values(): raise err msg.info(f"Using basic '{self.lang}' language model.") return spacy.blank(self.lang) def __len__(self) -> int: return len(self.documents) def __getitem__(self, key: str) -> str: return self.documents[key]
[docs] @classmethod def from_df( cls, data: pd.DataFrame, doc_col: str | None = None, lang: str | None = None, ) -> Self: """ Create corpus from data frame. Parameters ---------- data : DataFrame DataFrame containing documents. The index must contain document labels. doc_col : str, optional Indicates which column of ``data`` contains the document texts. If none is specified, the first column with strings is used. lang : str, optional The langugage model to use (default set by "lang" parameter). Returns ------- `Corpus` Raises ------ NoDocumentColumnException If no document column can be detected. """ object_cols = data.select_dtypes(include="object").columns if doc_col is None and object_cols.empty: raise NoDocumentColumnException("No suitable document column.") if doc_col is None: doc_col = str(object_cols[0]) return cls(data.copy()[doc_col], lang=lang)
[docs] @classmethod def from_dict( cls, data: dict[Any, str], lang: str | None = None, ) -> Self: """ Create corpus from dictionary. Parameters ---------- data : dict Dictionary containing the documents as values and document labels as keys. lang : str, optional The langugage model to use (default set by "lang" parameter). Returns ------- `Corpus` """ return cls(pd.Series(data), lang=lang)
[docs] @classmethod def from_files( cls, files: str | list[str] | list[Path] | Iterator[Path], doc_labels: list[str] | None = None, lang: str | None = None, ) -> Self: """Construct corpus from files. Parameters ---------- files : str or list of str or list of Path Path to files (with globbing pattern) or list of file paths. doc_labels : list of str, optional Labels for documents (default: file name without suffix). lang : str, optional The langugage model to use (default set by "lang" parameter). Returns ------- `Corpus` Raises ------ IsADirectoryError If the provided path is a directory. (Use globbing.) FileNotFoundError If the provided path does not exist. """ if isinstance(files, str): p = Path(files).expanduser() files = Path(p.parent).glob(p.name) files = [Path(f) for f in files] for file in files: if file.expanduser().is_file(): pass elif file.expanduser().exists(): raise IsADirectoryError(file.name) else: raise FileNotFoundError(file.name) if not doc_labels: doc_labels = [file.stem for file in files] data = pd.DataFrame({"path": files}, index=doc_labels) data["raw"] = data["path"].map(_read_file) return cls.from_df(data, doc_col="raw", lang=lang)
[docs] @classmethod def from_csv( cls, path: str, label_col: str | None = None, doc_col: str | None = None, lang: str | None = None, **kwargs, ) -> Self: """Read corpus from comma-separated value file. Parameters ---------- path : str Path to CSV file. label_col : str, optional Column that contains document labels (default: None, in which case the first column is used). doc_col : str, optional Column that contains document text (default: None, in which case the first text column is used). lang : str, optional The langugage model to use (default set by "lang" parameter). kwargs Arguments to pass to `pandas.read_csv`. Returns ------- `Corpus` """ kwargs.setdefault("index_col", label_col) data = pd.read_csv(path, **kwargs) if not label_col or isinstance(data.index, pd.RangeIndex): data = data.set_index(data.columns[0]) return cls.from_df(data, doc_col=doc_col, lang=lang)
[docs] @classmethod def from_sql( cls, qry: str, conn: str | object, label_col: str | None = None, doc_col: str | None = None, lang: str | None = None, **kwargs, ) -> Self: """Read corpus from SQL database. Parameters ---------- qry : str SQL query conn : str or object Database URI or connection object. label_col : str, optional Column that contains document labels (default: None, in which case the first column is used). doc_col : str, optional Column that contains document text (default: None, in which case the first text column is used). lang : str, optional The langugage model to use (default set by "lang" parameter). kwargs Arguments to pass to `pandas.read_sql`. Returns ------- `Corpus` """ kwargs.setdefault("index_col", label_col) data = pd.read_sql(qry, conn, **kwargs) if not label_col or isinstance(data.index, pd.RangeIndex): data = data.set_index(data.columns[0]) return cls.from_df(data, doc_col=doc_col, lang=lang)
[docs] def save(self, target: os.PathLike[Any] | str) -> None: """ Save a corpus to file. Parameters ---------- target : str or path File to save the corpus to. If the file exists, it will be overwritten. """ conn = sqlite3.connect(Path(target)) meta = {"lang": self.lang} with conn as c: self.documents.to_sql("corpus_documents", c, if_exists="replace") pd.Series(meta, name="values").to_sql( "corpus_meta", c, if_exists="replace", index_label="keys" )
[docs] @classmethod def load(cls, source: os.PathLike[Any] | str) -> Self: """ Load a corpus from file. Parameters ---------- source : str or path File to read the corpus from. This should be a file created by `Corpus.save`. Returns ------- `Corpus` Raises ------ FileNotFoundError If the specified path does not exist. """ if not Path(source).exists(): raise FileNotFoundError(f"File '{source}' does not exist.") conn = sqlite3.connect(Path(source)) with conn: documents = pd.read_sql( "SELECT * FROM corpus_documents", conn, index_col="label" ) meta = pd.read_sql("SELECT * FROM corpus_meta", conn, index_col="keys")[ "values" ] return cls.from_df(documents, lang=meta["lang"])
[docs] def tokenized( self, remove: list[str] | None = None, stem: bool = True, remove_stop_words: bool = True, remove_urls: bool = True, remove_numbers: bool = True, remove_punctuation: bool = True, lower: bool = True, sublinear: bool = True, ) -> TidyText: """Return tokenized version of corpus in tidy format. Parameters ---------- remove : list of str, optional Additional tokens to remove. stem : bool, optional Return token stems (default: True, if available). remove_stop_words : bool, optional Remove stop words (default: True). remove_urls : bool, optional Remove URL and email address tokens (default: True). remove_numbers : bool, optional Remove number tokens (default: True). remove_punctuation : bool, optional Remove punctuation marks, brackets, and quotation marks (default: True). lower : bool, optional Make lower-case (default: True). sublinear : bool, optional Apply sublinear scaling when calculating *tf-idf* term weights (default: True). Returns ------- `pandas.DataFrame` A data frame with document labels (index), tokens (term), and per-document counts (n). """ # Disable stemming if there is no lemmatizer available stem = all(["lemmatizer" in self._nlp_pipeline.pipe_names, stem]) func = compose( ( partial(_remove_additional, token_list=remove) if remove is not None else identity ), _lower if lower else identity, _stem if stem else _as_text, _remove_stop_words if remove_stop_words else identity, _remove_urls if remove_urls else identity, _remove_numbers if remove_numbers else identity, _remove_punctuation if remove_punctuation else identity, ) tt = self._make_tidy_text(func) return _tf_idf(tt, sublinear)
[docs] def noun_phrases( self, normalize: bool = False, remove: list[str] | None = None, sublinear: bool = True, ) -> TidyText: """Return noun phrases from corpus in tidy format. Parameters ---------- normalize : bool, optional Return lemmas of noun phrases (default: False). remove : list of str, optional Additional tokens to remove. sublinear : bool, optional Apply sublinear scaling when calculating *tf-idf* term weights (default: True). Returns ------- `pandas.DataFrame` A data frame with document labels (index), noun phrases (term), and per-document counts (n). """ func = compose( ( partial(_remove_additional, token_list=remove) if remove is not None else identity ), partial(_noun_chunks, normalize=normalize), ) tt = self._make_tidy_text(func) return _tf_idf(tt, sublinear)
[docs] def ngrams( self, size: int, remove: list[str] | None = None, stem: bool = False, remove_stop_words: bool = False, remove_urls: bool = False, remove_numbers: bool = False, remove_punctuation: bool = False, lower: bool = False, sublinear: bool = True, ) -> TidyText: """Return n-grams of length n from corpus in tidy format. Parameters ---------- size : int Size of n-grams to return. remove : list of str, optional Additional tokens to remove. stem : bool, optional Return token stems (default: False). remove_stop_words : bool, optional Remove stop words (default: False). remove_urls : bool, optional Remove URL and email address tokens (default: False). remove_numbers : bool, optional Remove number tokens (default: False). remove_punctuation : bool, optional Remove punctuation marks, brackets, and quotation marks (default: False). lower : bool, optional Make lower-case (default: False). sublinear : bool, optional Apply sublinear scaling when calculating *tf-idf* term weights (default: True). Returns ------- `pandas.DataFrame` A data frame with document labels (index), n-grams (term), and per-document counts (n). """ func = compose( partial(_ngrams, n=size), ( partial(_remove_additional, token_list=remove) if remove is not None else identity ), _lower if lower else identity, _stem if stem else _as_text, _remove_stop_words if remove_stop_words else identity, _remove_urls if remove_urls else identity, _remove_numbers if remove_numbers else identity, _remove_punctuation if remove_punctuation else identity, ) tt = self._make_tidy_text(func) return _tf_idf(tt, sublinear)
def _make_tidy_text(self, func: Callable[[Doc], list[str]]) -> TidyText: tt = ( pd .melt( self.nlp.map(func).apply(pd.Series).reset_index(), id_vars=["label"], value_name="term", ) .rename(columns={"variable": "n"}) .groupby(["label", "term"]) .count() .reset_index() .set_index("label") ) return TidyText(tt) def __repr__(self) -> str: return ( f"<{self.__class__.__name__} with {len(self.documents)} documents " + f"using language model '{self.lang}'>" ) def _repr_html_(self) -> str: tbl = pd.DataFrame(self.documents).to_html( header=False, notebook=False, border=0, classes=("full-width", "left-align"), max_rows=10, ) return f""" <style scoped> .full-width {{ width: 100%; }} .left-align td, .left-align th {{ text-align: left; }} summary {{ cursor: help; list-style: none; }} details[open] summary {{ margin-bottom: 1em; }} </style> <details> <summary> <table class="full-width"> <tr style="font-weight: 600;"> <td style="text-align: left;"> <kbd>{self.__class__.__name__}</kbd> </td> <td style="color: dodgerblue;"> <svg width="1ex" height="1ex"> <rect width="1ex" height="1ex" fill="dodgerblue"> </svg> Docs: {self.documents.shape[0]} </td> <td style="color: darkgray;"> Lang: {self.lang} </td> </tr> </table> </summary> {tbl} </details>"""
def _read_file(file_name: Path) -> str: """Read contents of file ignoring any unicode errors.""" return file_name.read_bytes().decode("utf-8", "replace").strip() def _normalize_whitespace(string: str) -> str: """Replace all whitespace with single spaces.""" return " ".join(string.split()) def _noun_chunks(doc: Doc, normalize: bool) -> list[str]: """Return only the noun chunks in lower case.""" return [ (chunk.lemma_ if normalize else " ".join([t.lower_ for t in chunk])) for chunk in doc.noun_chunks if not all(token.is_stop for token in chunk) ] def _remove_stop_words(doc: DocLike) -> list[Token]: """Return document without stop words.""" return [word for word in doc if not word.is_stop] def _remove_urls(doc: DocLike) -> list[Token]: """Return document without URLs or email addresses.""" return [word for word in doc if not word.like_url and not word.like_email] def _remove_numbers(doc: DocLike) -> list[Token]: """Return document without numbers.""" return [word for word in doc if not word.like_num] def _remove_punctuation(doc: DocLike) -> list[Token]: """Return document without punctuation, brackets and quotation marks.""" return [ word for word in doc if not word.is_punct and not word.is_bracket and not word.is_quote ] def _stem(doc: DocLike) -> list[str]: """Return list of word stem strings.""" return [word.lemma_ for word in doc] def _as_text(doc: DocLike) -> list[str]: """Turn document into list of strings.""" return [word.text for word in doc] def _lower(doc: list[str]) -> list[str]: """Return list of strings in lower case.""" return [s.lower() for s in doc] def _remove_additional(doc: list[str], token_list: list[str]) -> list[str]: """Return list of strings without specified tokens.""" return [s for s in doc if s not in token_list] def _ngrams(doc: list[str], n: int) -> list[str]: """Return list of n-gram strings.""" return [" ".join(t) for t in zip(*[doc[offset:] for offset in range(n)])] def _tf_idf(tidy_text: pd.DataFrame | TidyText, sublinear: bool) -> TidyText: """Calculate term frequency/inverse document frequency.""" if sublinear: tidy_text["tf"] = tidy_text["n"].map(_sublinear_scaling) else: totals = ( tidy_text .groupby(tidy_text.index) .sum() .rename(columns={"n": "total"}) .drop("term", axis=1) ) tidy_text = tidy_text.merge(totals, right_index=True, left_index=True) tidy_text["tf"] = tidy_text["n"] / tidy_text["total"] idfs = np.log10(len(set(tidy_text.index)) / tidy_text["term"].value_counts()) tt = tidy_text.merge(pd.DataFrame(idfs), left_on="term", right_index=True).rename( columns={"count": "idf"} ) tt["term_weight"] = tt["tf"] * tt["idf"] return TidyText(tt[["term", "n", "term_weight"]]) def _sublinear_scaling(n: int | float) -> float: """Logarithmic scaling function.""" return 1 + np.log10(n) if n > 0 else 0
[docs] class NoDocumentColumnException(Exception): """Raised if no suitable document column is specified or found."""
[docs] class TidyText(LiteFrame): """Collection of tokens with per-document counts."""