"""Implements the features relating to language."""
from __future__ import annotations
import os
import sqlite3
from os import cpu_count
from pathlib import Path
from typing import TYPE_CHECKING, Any, Self
import numpy as np
import pandas as pd
import spacy
from toolz import compose, identity, memoize, partial
from tqdm.auto import tqdm
from tqdm.contrib.concurrent import thread_map
from wasabi import msg
import textnets as tn
from ._util import LiteFrame, df_split
if TYPE_CHECKING:
from collections.abc import Callable, Iterator, Sequence
from spacy.tokens import Token
from spacy.tokens.doc import Doc
#: Custom type for objects resembling documents (token sequences).
DocLike = Doc | Sequence[Token]
#: Mapping of language codes to spaCy language model names.
LANGS = {
"ca": "ca_core_news_sm", # Catalan
"da": "da_core_news_sm", # Danish
"de": "de_core_news_sm", # German
"el": "el_core_news_sm", # Greek
"en": "en_core_web_sm", # English
"es": "es_core_news_sm", # Spanish
"fi": "fi_core_news_sm", # Finnish
"fr": "fr_core_news_sm", # French
"hr": "hr_core_news_sm", # Croatian
"it": "it_core_news_sm", # Italian
"ja": "ja_core_news_sm", # Japanese
"ko": "ko_core_news_sm", # Korean
"lt": "lt_core_news_sm", # Lithuanian
"mk": "mk_core_news_sm", # Macedonian
"nb": "nb_core_news_sm", # Norwegian
"nl": "nl_core_news_sm", # Dutch
"pl": "pl_core_news_sm", # Polish
"pt": "pt_core_news_sm", # Portuguese
"ro": "ro_core_news_sm", # Romanian
"ru": "ru_core_news_sm", # Russian
"sl": "sl_core_news_sm", # Slovenian
"sv": "sv_core_news_sm", # Swedish
"uk": "uk_core_news_sm", # Ukrainian
"zh": "zh_core_web_sm", # Chinese
}
_INSTALLED_MODELS = spacy.util.get_installed_models()
[docs]
class Corpus:
"""
Corpus of labeled documents.
Parameters
----------
data : Series
Series containing the documents. The index must contain document
labels.
lang : str, optional
The langugage model to use (default set by "lang" parameter).
Attributes
----------
documents : Series
The corpus documents.
lang : str
The language model used (ISO code or spaCy model name).
Raises
------
ValueError
If the supplied data is empty.
"""
def __init__(
self,
data: pd.Series,
lang: str | None = None,
) -> None:
if data.empty:
raise ValueError("Corpus data is empty.")
documents: pd.Series = data.copy()
if missings := documents.isna().sum():
msg.warn(f"Dropping {missings} empty document(s).")
documents = documents[~documents.isna()]
if duplicated := documents.index.duplicated().sum():
msg.info(
f"There are {duplicated} duplicate labels. Concatenating documents."
)
documents = documents.groupby(level=0).agg("\n\n".join)
documents.index = documents.index.set_names(["label"])
self.documents = documents
if lang is None:
lang = tn.params["lang"]
self.lang = LANGS.get(lang, lang)
if self.lang not in _INSTALLED_MODELS:
msg.info(f"Language model '{self.lang}' is not yet installed.")
@property
@memoize
def nlp(self) -> pd.Series:
"""Corpus documents with NLP applied."""
norm_docs: pd.Series = self.documents.map(_normalize_whitespace)
max_length = max(map(len, norm_docs))
if max_length > 1_000_000:
msg.info("Corpus contains very long documents. Memory usage will be high.")
self._nlp_pipeline.max_length = max_length
tqdm_args = dict(disable=not tn.params["progress_bar"] or None, unit="docs")
cores = cpu_count() or 1
if cores > 1 and len(self.documents) >= cores:
nlp_ufunc = np.frompyfunc(self._nlp_pipeline, 1, 1)
doc_chunks = df_split(norm_docs, cores)
return pd.concat(thread_map(nlp_ufunc, doc_chunks, **tqdm_args))
tqdm.pandas(**tqdm_args)
return norm_docs.progress_map(self._nlp_pipeline)
@property
@memoize
def _nlp_pipeline(self) -> spacy.Language:
model_opts: dict[str, dict | list] = {"exclude": ["ner", "textcat"]}
if self.lang.startswith("zh"):
model_opts["config"] = {"nlp": {"tokenizer": {"segmenter": "jieba"}}}
try:
return spacy.load(self.lang, **model_opts) # type:ignore
except OSError as err:
if tn.params["autodownload"]:
try:
spacy.cli.download(self.lang) # type: ignore
_INSTALLED_MODELS.append(self.lang)
return spacy.load(self.lang, **model_opts) # type:ignore
except (KeyError, OSError):
pass
elif self.lang in LANGS.values():
raise err
msg.info(f"Using basic '{self.lang}' language model.")
return spacy.blank(self.lang)
def __len__(self) -> int:
return len(self.documents)
def __getitem__(self, key: str) -> str:
return self.documents[key]
[docs]
@classmethod
def from_df(
cls,
data: pd.DataFrame,
doc_col: str | None = None,
lang: str | None = None,
) -> Self:
"""
Create corpus from data frame.
Parameters
----------
data : DataFrame
DataFrame containing documents. The index must contain document
labels.
doc_col : str, optional
Indicates which column of ``data`` contains the document texts. If
none is specified, the first column with strings is used.
lang : str, optional
The langugage model to use (default set by "lang" parameter).
Returns
-------
`Corpus`
Raises
------
NoDocumentColumnException
If no document column can be detected.
"""
object_cols = data.select_dtypes(include="object").columns
if doc_col is None and object_cols.empty:
raise NoDocumentColumnException("No suitable document column.")
if doc_col is None:
doc_col = str(object_cols[0])
return cls(data.copy()[doc_col], lang=lang)
[docs]
@classmethod
def from_dict(
cls,
data: dict[Any, str],
lang: str | None = None,
) -> Self:
"""
Create corpus from dictionary.
Parameters
----------
data : dict
Dictionary containing the documents as values and document labels
as keys.
lang : str, optional
The langugage model to use (default set by "lang" parameter).
Returns
-------
`Corpus`
"""
return cls(pd.Series(data), lang=lang)
[docs]
@classmethod
def from_files(
cls,
files: str | list[str] | list[Path] | Iterator[Path],
doc_labels: list[str] | None = None,
lang: str | None = None,
) -> Self:
"""Construct corpus from files.
Parameters
----------
files : str or list of str or list of Path
Path to files (with globbing pattern) or list of file paths.
doc_labels : list of str, optional
Labels for documents (default: file name without suffix).
lang : str, optional
The langugage model to use (default set by "lang" parameter).
Returns
-------
`Corpus`
Raises
------
IsADirectoryError
If the provided path is a directory. (Use globbing.)
FileNotFoundError
If the provided path does not exist.
"""
if isinstance(files, str):
p = Path(files).expanduser()
files = Path(p.parent).glob(p.name)
files = [Path(f) for f in files]
for file in files:
if file.expanduser().is_file():
pass
elif file.expanduser().exists():
raise IsADirectoryError(file.name)
else:
raise FileNotFoundError(file.name)
if not doc_labels:
doc_labels = [file.stem for file in files]
data = pd.DataFrame({"path": files}, index=doc_labels)
data["raw"] = data["path"].map(_read_file)
return cls.from_df(data, doc_col="raw", lang=lang)
[docs]
@classmethod
def from_csv(
cls,
path: str,
label_col: str | None = None,
doc_col: str | None = None,
lang: str | None = None,
**kwargs,
) -> Self:
"""Read corpus from comma-separated value file.
Parameters
----------
path : str
Path to CSV file.
label_col : str, optional
Column that contains document labels (default: None, in which case
the first column is used).
doc_col : str, optional
Column that contains document text (default: None, in which case
the first text column is used).
lang : str, optional
The langugage model to use (default set by "lang" parameter).
kwargs
Arguments to pass to `pandas.read_csv`.
Returns
-------
`Corpus`
"""
kwargs.setdefault("index_col", label_col)
data = pd.read_csv(path, **kwargs)
if not label_col or isinstance(data.index, pd.RangeIndex):
data = data.set_index(data.columns[0])
return cls.from_df(data, doc_col=doc_col, lang=lang)
[docs]
@classmethod
def from_sql(
cls,
qry: str,
conn: str | object,
label_col: str | None = None,
doc_col: str | None = None,
lang: str | None = None,
**kwargs,
) -> Self:
"""Read corpus from SQL database.
Parameters
----------
qry : str
SQL query
conn : str or object
Database URI or connection object.
label_col : str, optional
Column that contains document labels (default: None, in which case
the first column is used).
doc_col : str, optional
Column that contains document text (default: None, in which case
the first text column is used).
lang : str, optional
The langugage model to use (default set by "lang" parameter).
kwargs
Arguments to pass to `pandas.read_sql`.
Returns
-------
`Corpus`
"""
kwargs.setdefault("index_col", label_col)
data = pd.read_sql(qry, conn, **kwargs)
if not label_col or isinstance(data.index, pd.RangeIndex):
data = data.set_index(data.columns[0])
return cls.from_df(data, doc_col=doc_col, lang=lang)
[docs]
def save(self, target: os.PathLike[Any] | str) -> None:
"""
Save a corpus to file.
Parameters
----------
target : str or path
File to save the corpus to. If the file exists, it will be
overwritten.
"""
conn = sqlite3.connect(Path(target))
meta = {"lang": self.lang}
with conn as c:
self.documents.to_sql("corpus_documents", c, if_exists="replace")
pd.Series(meta, name="values").to_sql(
"corpus_meta", c, if_exists="replace", index_label="keys"
)
[docs]
@classmethod
def load(cls, source: os.PathLike[Any] | str) -> Self:
"""
Load a corpus from file.
Parameters
----------
source : str or path
File to read the corpus from. This should be a file created by
`Corpus.save`.
Returns
-------
`Corpus`
Raises
------
FileNotFoundError
If the specified path does not exist.
"""
if not Path(source).exists():
raise FileNotFoundError(f"File '{source}' does not exist.")
conn = sqlite3.connect(Path(source))
with conn:
documents = pd.read_sql(
"SELECT * FROM corpus_documents", conn, index_col="label"
)
meta = pd.read_sql("SELECT * FROM corpus_meta", conn, index_col="keys")[
"values"
]
return cls.from_df(documents, lang=meta["lang"])
[docs]
def tokenized(
self,
remove: list[str] | None = None,
stem: bool = True,
remove_stop_words: bool = True,
remove_urls: bool = True,
remove_numbers: bool = True,
remove_punctuation: bool = True,
lower: bool = True,
sublinear: bool = True,
) -> TidyText:
"""Return tokenized version of corpus in tidy format.
Parameters
----------
remove : list of str, optional
Additional tokens to remove.
stem : bool, optional
Return token stems (default: True, if available).
remove_stop_words : bool, optional
Remove stop words (default: True).
remove_urls : bool, optional
Remove URL and email address tokens (default: True).
remove_numbers : bool, optional
Remove number tokens (default: True).
remove_punctuation : bool, optional
Remove punctuation marks, brackets, and quotation marks
(default: True).
lower : bool, optional
Make lower-case (default: True).
sublinear : bool, optional
Apply sublinear scaling when calculating *tf-idf* term weights
(default: True).
Returns
-------
`pandas.DataFrame`
A data frame with document labels (index), tokens (term), and
per-document counts (n).
"""
# Disable stemming if there is no lemmatizer available
stem = all(["lemmatizer" in self._nlp_pipeline.pipe_names, stem])
func = compose(
(
partial(_remove_additional, token_list=remove)
if remove is not None
else identity
),
_lower if lower else identity,
_stem if stem else _as_text,
_remove_stop_words if remove_stop_words else identity,
_remove_urls if remove_urls else identity,
_remove_numbers if remove_numbers else identity,
_remove_punctuation if remove_punctuation else identity,
)
tt = self._make_tidy_text(func)
return _tf_idf(tt, sublinear)
[docs]
def noun_phrases(
self,
normalize: bool = False,
remove: list[str] | None = None,
sublinear: bool = True,
) -> TidyText:
"""Return noun phrases from corpus in tidy format.
Parameters
----------
normalize : bool, optional
Return lemmas of noun phrases (default: False).
remove : list of str, optional
Additional tokens to remove.
sublinear : bool, optional
Apply sublinear scaling when calculating *tf-idf* term weights
(default: True).
Returns
-------
`pandas.DataFrame`
A data frame with document labels (index), noun phrases
(term), and per-document counts (n).
"""
func = compose(
(
partial(_remove_additional, token_list=remove)
if remove is not None
else identity
),
partial(_noun_chunks, normalize=normalize),
)
tt = self._make_tidy_text(func)
return _tf_idf(tt, sublinear)
[docs]
def ngrams(
self,
size: int,
remove: list[str] | None = None,
stem: bool = False,
remove_stop_words: bool = False,
remove_urls: bool = False,
remove_numbers: bool = False,
remove_punctuation: bool = False,
lower: bool = False,
sublinear: bool = True,
) -> TidyText:
"""Return n-grams of length n from corpus in tidy format.
Parameters
----------
size : int
Size of n-grams to return.
remove : list of str, optional
Additional tokens to remove.
stem : bool, optional
Return token stems (default: False).
remove_stop_words : bool, optional
Remove stop words (default: False).
remove_urls : bool, optional
Remove URL and email address tokens (default: False).
remove_numbers : bool, optional
Remove number tokens (default: False).
remove_punctuation : bool, optional
Remove punctuation marks, brackets, and quotation marks
(default: False).
lower : bool, optional
Make lower-case (default: False).
sublinear : bool, optional
Apply sublinear scaling when calculating *tf-idf* term weights
(default: True).
Returns
-------
`pandas.DataFrame`
A data frame with document labels (index), n-grams (term), and
per-document counts (n).
"""
func = compose(
partial(_ngrams, n=size),
(
partial(_remove_additional, token_list=remove)
if remove is not None
else identity
),
_lower if lower else identity,
_stem if stem else _as_text,
_remove_stop_words if remove_stop_words else identity,
_remove_urls if remove_urls else identity,
_remove_numbers if remove_numbers else identity,
_remove_punctuation if remove_punctuation else identity,
)
tt = self._make_tidy_text(func)
return _tf_idf(tt, sublinear)
def _make_tidy_text(self, func: Callable[[Doc], list[str]]) -> TidyText:
tt = (
pd
.melt(
self.nlp.map(func).apply(pd.Series).reset_index(),
id_vars=["label"],
value_name="term",
)
.rename(columns={"variable": "n"})
.groupby(["label", "term"])
.count()
.reset_index()
.set_index("label")
)
return TidyText(tt)
def __repr__(self) -> str:
return (
f"<{self.__class__.__name__} with {len(self.documents)} documents "
+ f"using language model '{self.lang}'>"
)
def _repr_html_(self) -> str:
tbl = pd.DataFrame(self.documents).to_html(
header=False,
notebook=False,
border=0,
classes=("full-width", "left-align"),
max_rows=10,
)
return f"""
<style scoped>
.full-width {{ width: 100%; }}
.left-align td, .left-align th {{ text-align: left; }}
summary {{
cursor: help;
list-style: none;
}}
details[open] summary {{
margin-bottom: 1em;
}}
</style>
<details>
<summary>
<table class="full-width">
<tr style="font-weight: 600;">
<td style="text-align: left;">
<kbd>{self.__class__.__name__}</kbd>
</td>
<td style="color: dodgerblue;">
<svg width="1ex" height="1ex">
<rect width="1ex" height="1ex" fill="dodgerblue">
</svg>
Docs: {self.documents.shape[0]}
</td>
<td style="color: darkgray;">
Lang: {self.lang}
</td>
</tr>
</table>
</summary>
{tbl}
</details>"""
def _read_file(file_name: Path) -> str:
"""Read contents of file ignoring any unicode errors."""
return file_name.read_bytes().decode("utf-8", "replace").strip()
def _normalize_whitespace(string: str) -> str:
"""Replace all whitespace with single spaces."""
return " ".join(string.split())
def _noun_chunks(doc: Doc, normalize: bool) -> list[str]:
"""Return only the noun chunks in lower case."""
return [
(chunk.lemma_ if normalize else " ".join([t.lower_ for t in chunk]))
for chunk in doc.noun_chunks
if not all(token.is_stop for token in chunk)
]
def _remove_stop_words(doc: DocLike) -> list[Token]:
"""Return document without stop words."""
return [word for word in doc if not word.is_stop]
def _remove_urls(doc: DocLike) -> list[Token]:
"""Return document without URLs or email addresses."""
return [word for word in doc if not word.like_url and not word.like_email]
def _remove_numbers(doc: DocLike) -> list[Token]:
"""Return document without numbers."""
return [word for word in doc if not word.like_num]
def _remove_punctuation(doc: DocLike) -> list[Token]:
"""Return document without punctuation, brackets and quotation marks."""
return [
word
for word in doc
if not word.is_punct and not word.is_bracket and not word.is_quote
]
def _stem(doc: DocLike) -> list[str]:
"""Return list of word stem strings."""
return [word.lemma_ for word in doc]
def _as_text(doc: DocLike) -> list[str]:
"""Turn document into list of strings."""
return [word.text for word in doc]
def _lower(doc: list[str]) -> list[str]:
"""Return list of strings in lower case."""
return [s.lower() for s in doc]
def _remove_additional(doc: list[str], token_list: list[str]) -> list[str]:
"""Return list of strings without specified tokens."""
return [s for s in doc if s not in token_list]
def _ngrams(doc: list[str], n: int) -> list[str]:
"""Return list of n-gram strings."""
return [" ".join(t) for t in zip(*[doc[offset:] for offset in range(n)])]
def _tf_idf(tidy_text: pd.DataFrame | TidyText, sublinear: bool) -> TidyText:
"""Calculate term frequency/inverse document frequency."""
if sublinear:
tidy_text["tf"] = tidy_text["n"].map(_sublinear_scaling)
else:
totals = (
tidy_text
.groupby(tidy_text.index)
.sum()
.rename(columns={"n": "total"})
.drop("term", axis=1)
)
tidy_text = tidy_text.merge(totals, right_index=True, left_index=True)
tidy_text["tf"] = tidy_text["n"] / tidy_text["total"]
idfs = np.log10(len(set(tidy_text.index)) / tidy_text["term"].value_counts())
tt = tidy_text.merge(pd.DataFrame(idfs), left_on="term", right_index=True).rename(
columns={"count": "idf"}
)
tt["term_weight"] = tt["tf"] * tt["idf"]
return TidyText(tt[["term", "n", "term_weight"]])
def _sublinear_scaling(n: int | float) -> float:
"""Logarithmic scaling function."""
return 1 + np.log10(n) if n > 0 else 0
[docs]
class NoDocumentColumnException(Exception):
"""Raised if no suitable document column is specified or found."""
[docs]
class TidyText(LiteFrame):
"""Collection of tokens with per-document counts."""