"""Implements the features relating to language."""
from __future__ import annotations
import os
import sqlite3
from glob import glob
from os import cpu_count
from pathlib import Path
from typing import Any, Callable, Sequence, Union
from warnings import warn
import numpy as np
import pandas as pd
import spacy
from spacy.tokens import Token
from spacy.tokens.doc import Doc
from toolz import compose, identity, memoize, partial
from tqdm.auto import tqdm
from tqdm.contrib.concurrent import thread_map
import textnets as tn
from ._util import LiteFrame
#: Mapping of language codes to spaCy language model names.
LANGS = {
"ca": "ca_core_news_sm", # Catalan
"da": "da_core_news_sm", # Danish
"de": "de_core_news_sm", # German
"el": "el_core_news_sm", # Greek
"en": "en_core_web_sm", # English
"es": "es_core_news_sm", # Spanish
"fi": "fi_core_news_sm", # Finnish
"fr": "fr_core_news_sm", # French
"hr": "hr_core_news_sm", # Croatian
"it": "it_core_news_sm", # Italian
"ja": "ja_core_news_sm", # Japanese
"ko": "ko_core_news_sm", # Korean
"lt": "lt_core_news_sm", # Lithuanian
"mk": "mk_core_news_sm", # Macedonian
"nb": "nb_core_news_sm", # Norwegian
"nl": "nl_core_news_sm", # Dutch
"pl": "pl_core_news_sm", # Polish
"pt": "pt_core_news_sm", # Portuguese
"ro": "ro_core_news_sm", # Romanian
"ru": "ru_core_news_sm", # Russian
"sl": "sl_core_news_sm", # Slovenian
"sv": "sv_core_news_sm", # Swedish
"uk": "uk_core_news_sm", # Ukrainian
"zh": "zh_core_web_sm", # Chinese
}
#: Custom type for objects resembling documents (token sequences).
DocLike = Union[Doc, Sequence[Token]]
_INSTALLED_MODELS = spacy.util.get_installed_models()
[docs]
class Corpus:
"""
Corpus of labeled documents.
Parameters
----------
data : Series
Series containing the documents. The index must contain document
labels.
lang : str, optional
The langugage model to use (default set by "lang" parameter).
Raises
------
ValueError
If the supplied data is empty.
Attributes
----------
documents : Series
The corpus documents.
lang : str
The language model used (ISO code or spaCy model name).
"""
def __init__(
self,
data: pd.Series,
lang: str | None = None,
) -> None:
if data.empty:
raise ValueError("Corpus data is empty.")
documents = data.copy()
if missings := documents.isna().sum():
warn(f"Dropping {missings} empty document(s).")
documents = documents[~documents.isna()]
if duplicated := documents.index.duplicated().sum():
warn(f"There are {duplicated} duplicate labels. Concatenating documents.")
documents = documents.groupby(level=0).agg("\n\n".join)
documents.index = documents.index.set_names(["label"])
self.documents = documents
if lang is None:
lang = tn.params["lang"]
self.lang = LANGS.get(lang, lang)
if self.lang not in _INSTALLED_MODELS:
warn(f"Language model '{self.lang}' is not yet installed.")
@property
def nlp(self) -> pd.Series:
"""Corpus documents with NLP applied."""
return self._nlp(self.lang)
@memoize
def _nlp(self, lang: str) -> pd.Series:
try:
nlp = spacy.load(lang, exclude=["ner", "textcat"])
except OSError as err:
if tn.params["autodownload"]:
try:
spacy.cli.download(lang) # type: ignore
_INSTALLED_MODELS.append(lang)
return self._nlp(lang)
except (KeyError, OSError):
pass
elif lang in LANGS.values():
raise err
nlp = spacy.blank(lang)
warn(f"Using basic '{lang}' language model.")
norm_docs = self.documents.map(_normalize_whitespace)
max_length = max(map(len, norm_docs))
if max_length > 1_000_000:
warn("Corpus contains very long documents. Memory usage will be high.")
nlp.max_length = max_length
tqdm_args = dict(disable=not tn.params["progress_bar"] or None, unit="docs")
cores = cpu_count() or 1
if cores > 1 and len(self.documents) >= cores:
nlp_ufunc = np.frompyfunc(nlp, 1, 1)
doc_chunks = np.array_split(norm_docs, cores)
return pd.concat(thread_map(nlp_ufunc, doc_chunks, **tqdm_args))
tqdm.pandas(**tqdm_args)
return norm_docs.progress_map(nlp)
def __len__(self) -> int:
return len(self.documents)
def __getitem__(self, key: str) -> str:
return self.documents[key]
[docs]
@classmethod
def from_df(
cls,
data: pd.DataFrame,
doc_col: str | None = None,
lang: str | None = None,
) -> Corpus:
"""
Create corpus from data frame.
Parameters
----------
data : DataFrame
DataFrame containing documents. The index must contain document
labels.
doc_col : str, optional
Indicates which column of ``data`` contains the document texts. If
none is specified, the first column with strings is used.
lang : str, optional
The langugage model to use (default set by "lang" parameter).
Raises
------
NoDocumentColumnException
If no document column can be detected.
Returns
-------
`Corpus`
"""
object_cols = data.select_dtypes(include="object").columns
if doc_col is None and object_cols.empty:
raise NoDocumentColumnException("No suitable document column.")
if doc_col is None:
doc_col = str(object_cols[0])
return cls(data.copy()[doc_col], lang=lang)
[docs]
@classmethod
def from_dict(
cls,
data: dict[Any, str],
lang: str | None = None,
) -> Corpus:
"""
Create corpus from dictionary.
Parameters
----------
data : dict
Dictionary containing the documents as values and document labels
as keys.
lang : str, optional
The langugage model to use (default set by "lang" parameter).
Returns
-------
`Corpus`
"""
return cls(pd.Series(data), lang=lang)
[docs]
@classmethod
def from_files(
cls,
files: str | list[str] | list[Path],
doc_labels: list[str] | None = None,
lang: str | None = None,
) -> Corpus:
"""Construct corpus from files.
Parameters
----------
files : str or list of str or list of Path
Path to files (with globbing pattern) or list of file paths.
doc_labels : list of str, optional
Labels for documents (default: file name without suffix).
lang : str, optional
The langugage model to use (default set by "lang" parameter).
Raises
------
IsADirectoryError
If the provided path is a directory. (Use globbing.)
FileNotFoundError
If the provided path does not exist.
Returns
-------
`Corpus`
"""
if isinstance(files, str):
files = glob(os.path.expanduser(files))
files = [Path(f) for f in files]
for file in files:
if file.expanduser().is_file():
pass
elif file.expanduser().exists():
raise IsADirectoryError(file.name)
else:
raise FileNotFoundError(file.name)
if not doc_labels:
doc_labels = [file.stem for file in files]
data = pd.DataFrame({"path": files}, index=doc_labels)
data["raw"] = data["path"].map(_read_file)
return cls.from_df(data, doc_col="raw", lang=lang)
[docs]
@classmethod
def from_csv(
cls,
path: str,
label_col: str | None = None,
doc_col: str | None = None,
lang: str | None = None,
**kwargs,
) -> Corpus:
"""Read corpus from comma-separated value file.
Parameters
----------
path : str
Path to CSV file.
label_col : str, optional
Column that contains document labels (default: None, in which case
the first column is used).
doc_col : str, optional
Column that contains document text (default: None, in which case
the first text column is used).
lang : str, optional
The langugage model to use (default set by "lang" parameter).
kwargs
Arguments to pass to `pandas.read_csv`.
Returns
-------
`Corpus`
"""
kwargs.setdefault("index_col", label_col)
data = pd.read_csv(path, **kwargs)
if not label_col or isinstance(data.index, pd.RangeIndex):
data = data.set_index(data.columns[0])
return cls.from_df(data, doc_col=doc_col, lang=lang)
[docs]
@classmethod
def from_sql(
cls,
qry: str,
conn: str | object,
label_col: str | None = None,
doc_col: str | None = None,
lang: str | None = None,
**kwargs,
) -> Corpus:
"""Read corpus from SQL database.
Parameters
----------
qry : str
SQL query
conn : str or object
Database URI or connection object.
label_col : str, optional
Column that contains document labels (default: None, in which case
the first column is used).
doc_col : str, optional
Column that contains document text (default: None, in which case
the first text column is used).
lang : str, optional
The langugage model to use (default set by "lang" parameter).
kwargs
Arguments to pass to `pandas.read_sql`.
Returns
-------
`Corpus`
"""
kwargs.setdefault("index_col", label_col)
data = pd.read_sql(qry, conn, **kwargs)
if not label_col or isinstance(data.index, pd.RangeIndex):
data = data.set_index(data.columns[0])
return cls.from_df(data, doc_col=doc_col, lang=lang)
[docs]
def save(self, target: os.PathLike[Any] | str) -> None:
"""
Save a corpus to file.
Parameters
----------
target : str or path
File to save the corpus to. If the file exists, it will be
overwritten.
"""
conn = sqlite3.connect(Path(target))
meta = {"lang": self.lang}
with conn as c:
self.documents.to_sql("corpus_documents", c, if_exists="replace")
pd.Series(meta, name="values").to_sql(
"corpus_meta", c, if_exists="replace", index_label="keys"
)
[docs]
@classmethod
def load(cls, source: os.PathLike[Any] | str) -> Corpus:
"""
Load a corpus from file.
Parameters
----------
source : str or path
File to read the corpus from. This should be a file created by
`Corpus.save`.
Raises
------
FileNotFoundError
If the specified path does not exist.
Returns
-------
`Corpus`
"""
if not Path(source).exists():
raise FileNotFoundError(f"File '{source}' does not exist.")
conn = sqlite3.connect(Path(source))
with conn:
documents = pd.read_sql(
"SELECT * FROM corpus_documents", conn, index_col="label"
)
meta = pd.read_sql("SELECT * FROM corpus_meta", conn, index_col="keys")[
"values"
]
return cls.from_df(documents, lang=meta["lang"])
[docs]
def tokenized(
self,
remove: list[str] | None = None,
stem: bool = True,
remove_stop_words: bool = True,
remove_urls: bool = True,
remove_numbers: bool = True,
remove_punctuation: bool = True,
lower: bool = True,
sublinear: bool = True,
) -> TidyText:
"""Return tokenized version of corpus in tidy format.
Parameters
----------
remove : list of str, optional
Additional tokens to remove.
stem : bool, optional
Return token stems (default: True).
remove_stop_words : bool, optional
Remove stop words (default: True).
remove_urls : bool, optional
Remove URL and email address tokens (default: True).
remove_numbers : bool, optional
Remove number tokens (default: True).
remove_punctuation : bool, optional
Remove punctuation marks, brackets, and quotation marks
(default: True).
lower : bool, optional
Make lower-case (default: True).
sublinear : bool, optional
Apply sublinear scaling when calculating *tf-idf* term weights
(default: True).
Returns
-------
`pandas.DataFrame`
A data frame with document labels (index), tokens (term), and
per-document counts (n).
"""
func = compose(
partial(_remove_additional, token_list=remove)
if remove is not None
else identity,
_lower if lower else identity,
_stem if stem else _as_text,
_remove_stop_words if remove_stop_words else identity,
_remove_urls if remove_urls else identity,
_remove_numbers if remove_numbers else identity,
_remove_punctuation if remove_punctuation else identity,
)
tt = self._make_tidy_text(func)
return _tf_idf(tt, sublinear)
[docs]
def noun_phrases(
self,
normalize: bool = False,
remove: list[str] | None = None,
sublinear: bool = True,
) -> TidyText:
"""Return noun phrases from corpus in tidy format.
Parameters
----------
normalize : bool, optional
Return lemmas of noun phrases (default: False).
remove : list of str, optional
Additional tokens to remove.
sublinear : bool, optional
Apply sublinear scaling when calculating *tf-idf* term weights
(default: True).
Returns
-------
`pandas.DataFrame`
A data frame with document labels (index), noun phrases
(term), and per-document counts (n).
"""
func = compose(
partial(_remove_additional, token_list=remove)
if remove is not None
else identity,
partial(_noun_chunks, normalize=normalize),
)
tt = self._make_tidy_text(func)
return _tf_idf(tt, sublinear)
[docs]
def ngrams(
self,
size: int,
remove: list[str] | None = None,
stem: bool = False,
remove_stop_words: bool = False,
remove_urls: bool = False,
remove_numbers: bool = False,
remove_punctuation: bool = False,
lower: bool = False,
sublinear: bool = True,
) -> TidyText:
"""Return n-grams of length n from corpus in tidy format.
Parameters
----------
size : int
Size of n-grams to return.
remove : list of str, optional
Additional tokens to remove.
stem : bool, optional
Return token stems (default: False).
remove_stop_words : bool, optional
Remove stop words (default: False).
remove_urls : bool, optional
Remove URL and email address tokens (default: False).
remove_numbers : bool, optional
Remove number tokens (default: False).
remove_punctuation : bool, optional
Remove punctuation marks, brackets, and quotation marks
(default: False).
lower : bool, optional
Make lower-case (default: False).
sublinear : bool, optional
Apply sublinear scaling when calculating *tf-idf* term weights
(default: True).
Returns
-------
`pandas.DataFrame`
A data frame with document labels (index), n-grams (term), and
per-document counts (n).
"""
func = compose(
partial(_ngrams, n=size),
partial(_remove_additional, token_list=remove)
if remove is not None
else identity,
_lower if lower else identity,
_stem if stem else _as_text,
_remove_stop_words if remove_stop_words else identity,
_remove_urls if remove_urls else identity,
_remove_numbers if remove_numbers else identity,
_remove_punctuation if remove_punctuation else identity,
)
tt = self._make_tidy_text(func)
return _tf_idf(tt, sublinear)
def _make_tidy_text(self, func: Callable[[Doc], list[str]]) -> TidyText:
tt = (
pd.melt(
self.nlp.map(func).apply(pd.Series).reset_index(),
id_vars=["label"],
value_name="term",
)
.rename(columns={"variable": "n"})
.groupby(["label", "term"])
.count()
.reset_index()
.set_index("label")
)
return TidyText(tt)
def __repr__(self) -> str:
return (
f"<{self.__class__.__name__} with {len(self.documents)} documents "
+ f"using language model '{self.lang}'>"
)
def _repr_html_(self) -> str:
tbl = pd.DataFrame(self.documents).to_html(
header=False,
notebook=False,
border=0,
classes=("full-width", "left-align"),
max_rows=10,
)
return f"""
<style scoped>
.full-width {{ width: 100%; }}
.left-align td, .left-align th {{ text-align: left; }}
summary {{
cursor: help;
list-style: none;
}}
details[open] summary {{
margin-bottom: 1em;
}}
</style>
<details>
<summary>
<table class="full-width">
<tr style="font-weight: 600;">
<td style="text-align: left;">
<kbd>{self.__class__.__name__}</kbd>
</td>
<td style="color: dodgerblue;">
<svg width="1ex" height="1ex">
<rect width="1ex" height="1ex" fill="dodgerblue">
</svg>
Docs: {self.documents.shape[0]}
</td>
<td style="color: darkgray;">
Lang: {self.lang}
</td>
</tr>
</table>
</summary>
{tbl}
</details>"""
def _read_file(file_name: Path) -> str:
"""Read contents of file ignoring any unicode errors."""
return file_name.read_bytes().decode("utf-8", "replace").strip()
def _normalize_whitespace(string: str) -> str:
"""Replace all whitespace with single spaces."""
return " ".join(string.split())
def _noun_chunks(doc: Doc, normalize: bool) -> list[str]:
"""Return only the noun chunks in lower case."""
return [
(chunk.lemma_ if normalize else " ".join([t.lower_ for t in chunk]))
for chunk in doc.noun_chunks
if not all(token.is_stop for token in chunk)
]
def _remove_stop_words(doc: DocLike) -> list[Token]:
"""Return document without stop words."""
return [word for word in doc if not word.is_stop]
def _remove_urls(doc: DocLike) -> list[Token]:
"""Return document without URLs or email addresses."""
return [word for word in doc if not word.like_url and not word.like_email]
def _remove_numbers(doc: DocLike) -> list[Token]:
"""Return document without numbers."""
return [word for word in doc if not word.like_num]
def _remove_punctuation(doc: DocLike) -> list[Token]:
"""Return document without punctuation, brackets and quotation marks."""
return [
word
for word in doc
if not word.is_punct and not word.is_bracket and not word.is_quote
]
def _stem(doc: DocLike) -> list[str]:
"""Return list of word stem strings."""
return [word.lemma_ for word in doc]
def _as_text(doc: DocLike) -> list[str]:
"""Turn document into list of strings."""
return [word.text for word in doc]
def _lower(doc: list[str]) -> list[str]:
"""Return list of strings in lower case."""
return [s.lower() for s in doc]
def _remove_additional(doc: list[str], token_list: list[str]) -> list[str]:
"""Return list of strings without specified tokens."""
return [s for s in doc if s not in token_list]
def _ngrams(doc: list[str], n: int) -> list[str]:
"""Return list of n-gram strings."""
return [" ".join(t) for t in zip(*[doc[offset:] for offset in range(n)])]
def _tf_idf(tidy_text: pd.DataFrame | TidyText, sublinear: bool) -> TidyText:
"""Calculate term frequency/inverse document frequency."""
if sublinear:
tidy_text["tf"] = tidy_text["n"].map(_sublinear_scaling)
else:
totals = (
tidy_text.groupby(tidy_text.index)
.sum()
.rename(columns={"n": "total"})
.drop("term", axis=1)
)
tidy_text = tidy_text.merge(totals, right_index=True, left_index=True)
tidy_text["tf"] = tidy_text["n"] / tidy_text["total"]
idfs = np.log10(len(set(tidy_text.index)) / tidy_text["term"].value_counts())
tt = tidy_text.merge(pd.DataFrame(idfs), left_on="term", right_index=True).rename(
columns={"count": "idf"}
)
tt["term_weight"] = tt["tf"] * tt["idf"]
return TidyText(tt[["term", "n", "term_weight"]])
def _sublinear_scaling(n: int | float) -> float:
"""Logarithmic scaling function."""
return 1 + np.log10(n) if n > 0 else 0
[docs]
class NoDocumentColumnException(Exception):
"""Raised if no suitable document column is specified or found."""
[docs]
class TidyText(LiteFrame):
"""Collection of tokens with per-document counts."""