Source code for textnets.network

"""Implements the features relating to networks."""

from __future__ import annotations

import json
import os
import sqlite3
import warnings
from abc import ABC, abstractmethod
from collections import Counter
from enum import Flag
from functools import cached_property
from os import cpu_count
from pathlib import Path
from typing import Any, Callable, IO, Iterator, Literal
from warnings import warn

import igraph as ig
import leidenalg as la
import numpy as np
import pandas as pd
from scipy import LowLevelCallable
from scipy.integrate import quad
from toolz import memoize
from tqdm.auto import tqdm
from tqdm.contrib.concurrent import thread_map

import textnets as tn
from ._util import LiteFrame

from .corpus import TidyText
from .fca import FormalContext
from .viz import decorate_plot

try:
    from . import _ext  # type: ignore

    integrand = LowLevelCallable.from_cython(_ext, "df_integrand")
except ImportError:

    def integrand(x: float, degree: int) -> np.float64:
        """Fallback version of integrand function for the disparity filter."""
        return np.float64(1 - x) ** (degree - 2)

    warn("Could not import compiled extension, backbone extraction will be slow.")


#: Flag to distinguish node types.
NodeType = Flag("NodeType", [("TERM", True), ("DOC", False)])
TERM = NodeType.TERM
DOC = NodeType.DOC


def _make_top(prop, desc):
    """Helper function to create top_* methods for Textnet classes."""

    def method(cls, n=10):
        return getattr(cls, prop).sort_values(ascending=False).head(n)

    method.__doc__ = f"""
        Show nodes sorted by {desc}.

        Parameters
        ----------
        n : int, optional
            How many nodes to show (default: 10).

        Returns
        -------
        `pandas.Series`
            Ranked nodes.
        """
    return method


[docs] class TextnetBase(ABC): """ Abstract base class for `Textnet` and `ProjectedTextnet`. Attributes ---------- graph : `igraph.Graph` Direct access to the igraph object. """ def __init__(self, graph: ig.Graph) -> None: self.graph: ig.Graph = graph @property def summary(self) -> str: """Summary of underlying graph.""" return self.graph.summary() @property def nodes(self) -> ig.VertexSeq: """Iterate over nodes.""" return self.graph.vs @property def edges(self) -> ig.EdgeSeq: """Iterate of edges.""" return self.graph.es
[docs] def vcount(self) -> int: """Return the number of vertices (nodes).""" return self.graph.vcount()
[docs] def ecount(self) -> int: """Return the number of edges.""" return self.graph.ecount()
[docs] def save_graph( self, target: str | bytes | os.PathLike[Any] | IO, format: str | None = None, ) -> None: """ Save the underlying graph. Parameters ---------- target : str or path or file File or path that the graph should be written to. format : {"dot", "edgelist", "gml", "graphml", "pajek", ...}, optional Optionally specify the desired format (otherwise it is inferred from the file suffix). """ self.graph.write(target, format)
@cached_property def degree(self) -> pd.Series: """Unweighted node degree.""" return pd.Series(self.graph.degree(), index=self.nodes["id"]) @cached_property def strength(self) -> pd.Series: """Weighted node degree.""" return pd.Series(self.graph.strength(weights="weight"), index=self.nodes["id"]) @cached_property def node_types(self) -> list[NodeType]: """Return list of node types.""" return [TERM if t == "term" else DOC for t in self.nodes["type"]] @abstractmethod def plot(self, **kwargs) -> ig.Plot: pass @abstractmethod def _partition_graph(self, resolution: float, seed: int) -> ig.VertexClustering: pass _partition: ig.VertexClustering | None = None @property def clusters(self) -> ig.VertexClustering: """ Return graph partition. The partition is detected by the Leiden algorithm, unless a different partition that was supplied to the setter. """ if self._partition is None: self._partition = self._partition_graph( resolution=tn.params["resolution_parameter"], seed=tn.params["seed"], ) return self._partition @clusters.setter def clusters( self, value: ig.VertexClustering | ig.VertexDendrogram | dict[int, list[int]], ) -> None: if isinstance(value, ig.VertexClustering): self._partition = value elif isinstance(value, ig.VertexDendrogram): self._partition = value.as_clustering() elif isinstance(value, dict): sorted_node_community_map = dict(sorted(value.items())) part = ig.VertexClustering( self.graph, membership=[i[0] for i in sorted_node_community_map.values()], ) self._partition = part elif isinstance(value, list): part = ig.VertexClustering(self.graph, membership=value) self._partition = part else: raise ValueError("No valid clusters supplied.") @clusters.deleter def clusters(self) -> None: self._partition = None @property def modularity(self) -> float: """Return modularity based on graph partition.""" return self.graph.modularity(self.clusters, weights="weight") @property def cluster_strength(self) -> pd.Series: """Weighted node degree within each cluster's subgraph.""" d = {} for c in self.clusters: subgraph = self.graph.subgraph(c) subgraph_strength = subgraph.strength(weights="weight") for n, strength in zip(c, subgraph_strength): node = self.nodes[n] d[node["id"]] = strength return pd.Series(d).reindex(self.strength.index) @property def cluster_local_cc(self) -> pd.Series: """Weighted local clustering coefficient within each cluster's subgraph.""" d = {} for c in self.clusters: subgraph = self.graph.subgraph(c) subgraph_cc = subgraph.transitivity_local_undirected(weights="weight") for n, cc in zip(c, subgraph_cc): node = self.nodes[n] d[node["id"]] = cc return pd.Series(d).reindex(self.strength.index)
[docs] def top_cluster_nodes( self, n: int = 10, rank_nodes_by: str = "cluster_strength" ) -> pd.DataFrame: """ Show top nodes per cluster, ranked by a chosen metric. Parameters ---------- n : int, optional How many nodes to show per cluster (default: 10) rank_nodes_by : str, optional Metric to rank nodes within each cluster by (default: cluster_strength). Returns ------- `pandas.DataFrame` Clusters with representative nodes. """ return ( pd.DataFrame( { "nodes": self.nodes["id"], "metric": getattr(self, rank_nodes_by), "cluster": self.clusters.membership, } ) .sort_values("metric", ascending=False) .groupby("cluster") .agg({"nodes": lambda x: x[:n], "metric": len}) .rename(columns={"metric": "size"}) )
@decorate_plot def _plot( self, **kwargs, ) -> ig.Plot: tn.init_seed() return ig.plot(self.graph, **kwargs) def __repr__(self) -> str: type_counts: Counter[Literal["doc", "term"]] = Counter(self.nodes["type"]) return ( f"""<{self.__class__.__name__} with {type_counts["doc"]} documents, """ + f"""{type_counts["term"]} terms, and {self.ecount()} edges>""" ) def _repr_html_(self) -> str: type_counts: Counter[Literal["doc", "term"]] = Counter(self.nodes["type"]) return f""" <style scoped> .full-width {{ width: 100%; }} </style> <table class="full-width"> <tr style="font-weight: 600;"> <td style="text-align: left;"> <kbd>{self.__class__.__name__}</kbd> </td> <td style="color: dodgerblue;"> <svg width="1ex" height="1ex"> <rect width="1ex" height="1ex" fill="dodgerblue"> </svg> Docs: {type_counts["doc"]} </td> <td style="color: orangered;"> <svg width="1ex" height="1ex"> <circle cx="50%" cy="50%" r="50%" fill="orangered"> </svg> Terms: {type_counts["term"]} </td> <td style="color: darkgray;"> <svg width="2ex" height="1ex"> <line x1="0" y1="50%" x2="100%" y2="50%" stroke="darkgray" stroke-width="2"> </svg> Edges: {self.ecount()} </td> </tr> </table>"""
for prop, desc in [("degree", "unweighted degree"), ("strength", "weighted degree")]: setattr(TextnetBase, f"top_{prop}", _make_top(prop, desc))
[docs] class Textnet(TextnetBase, FormalContext): """ Textnet for the relational analysis of meanings. A textnet is a bipartite network of documents and terms. Links exist only between the two types of nodes. Documents have a tie with terms they contain; the tie is weighted by *tf-idf*. The bipartite network can be projected into two different kinds of single-mode networks: document-to-document, and term-to-term. Experimental: The underlying bipartite adjacency matrix can also be turned into a formal context, which can be used to construct a concept lattice. Parameters ---------- data: TidyText or BiadjacencyMatrix * DataFrame of tokens with per-document counts, as created by `Corpus.tokenized` `Corpus.ngrams`, and `Corpus.noun_phrases`. * A bipartite adjacency matrix relating documents to terms. min_docs : int, optional Minimum number of documents a term must appear in to be included in the network (default: 2). connected : bool, optional Keep only the largest connected component of the network (default: False). remove_weak_edges : bool, optional Remove edges with weights far below average (default: False). doc_attrs : dict of dict, optional Additional attributes of document nodes. Raises ------ ValueError If the supplied data is empty. """ def __init__( self, data: TidyText | BiadjacencyMatrix | pd.DataFrame, min_docs: int = 2, connected: bool = False, remove_weak_edges: bool = False, doc_attrs: dict[str, dict[str, Any]] | None = None, ) -> None: self._connected = connected self._doc_attrs = doc_attrs if data.empty: raise ValueError("Data is empty.") if isinstance(data, BiadjacencyMatrix): self._matrix = data elif isinstance(data, (TidyText, pd.DataFrame)): self._matrix = _matrix_from_tidy_text(data, min_docs) if remove_weak_edges: pairs: pd.Series = self._matrix.stack() edge_weights: pd.Series = pairs[pairs > 0] iqr: float = edge_weights.quantile(0.75) - edge_weights.quantile(0.25) cutoff: float = edge_weights.median() - 1.5 * iqr if cutoff > 0: self._matrix = BiadjacencyMatrix( self._matrix[self._matrix > cutoff].dropna(how="all").fillna(0) ) @cached_property def graph(self) -> ig.Graph: """Direct access to the underlying igraph object.""" g = _graph_from_matrix(self._matrix) if self._doc_attrs is not None: for name, attr in self._doc_attrs.items(): g.vs[name] = [attr.get(doc) for doc in g.vs["id"]] if self._connected: return giant_component(g) return g @cached_property def m(self) -> BiadjacencyMatrix: """Weighted bipartite adjacency matrix of the bipartite graph.""" if not self._connected: return self._matrix a = np.array(self.graph.get_biadjacency(types=self.node_types)[0]).astype( "float64" ) a[a == 1] = self.edges["weight"] doc_count, _ = a.shape m = BiadjacencyMatrix( a, index=self.nodes["id"][:doc_count], columns=self.nodes["id"][doc_count:] ) m.T.index.name = "term" return m
[docs] def project( self, *, node_type: Literal["doc", "term"] | NodeType, connected: bool | None = False, ) -> ProjectedTextnet: """ Project to one-mode network. Parameters ---------- node_type : {NodeType.DOC, NodeType.TERM, "doc", "term"} Either `DOC` or `TERM`, depending on desired node type. connected : bool, optional Keep only the largest connected component of the projected network (default: False). Raises ------ ValueError If no valid node type is specified. Returns ------- `ProjectedTextnet` A one-mode textnet. """ if not isinstance(node_type, NodeType) and node_type not in {"doc", "term"}: raise ValueError("No valid node_type specified.") graph_to_return = 0 if node_type in (TERM, "term"): graph_to_return = 1 sparse_array = self.m.to_sparse_array() weights = sparse_array.T @ sparse_array else: array = self.m.to_array() weights = array @ array.T g = self.graph.bipartite_projection( types=self.node_types, which=graph_to_return ) pairs = [ (s.index, t.index) for s, t in (g.es[i].vertex_tuple for i in g.es.indices) ] g.es.set_attribute_values("weight", [weights[i] for i in pairs]) g.es["cost"] = [ 1 / pow(w, tn.params["tuning_parameter"]) for w in g.es["weight"] ] if connected: g = giant_component(g) return ProjectedTextnet(g)
[docs] def save(self, target: os.PathLike[Any] | str) -> None: """ Save a textnet to file. Parameters ---------- target : str or path File to save the corpus to. If the file exists, it will be overwritten. """ conn = sqlite3.connect(Path(target)) meta = {"connected": self._connected, "doc_attrs": json.dumps(self._doc_attrs)} with conn, warnings.catch_warnings(): warnings.simplefilter("ignore") # catch warning from pandas.to_sql self.m.T.to_sql("textnet_im", conn, if_exists="replace") pd.Series(meta, name="values").to_sql( "textnet_meta", conn, if_exists="replace", index_label="keys" )
[docs] @classmethod def load(cls, source: os.PathLike[Any] | str) -> Textnet: """ Load a textnet from file. Parameters ---------- source : str or path File to read the corpus from. This should be a file created by `Textnet.save`. Raises ------ FileNotFoundError If the provided path does not exist. Returns ------- `Textnet` """ if not Path(source).exists(): raise FileNotFoundError(f"File '{source}' does not exist.") conn = sqlite3.connect(Path(source)) with conn as c: m = pd.read_sql("SELECT * FROM textnet_im", c, index_col="term") meta = pd.read_sql("SELECT * FROM textnet_meta", c, index_col="keys")[ "values" ] connected = meta["connected"] == "1" doc_attrs = json.loads(meta["doc_attrs"]) return cls(BiadjacencyMatrix(m.T), connected=connected, doc_attrs=doc_attrs)
[docs] def plot( self, *, color_clusters: bool | ig.VertexClustering = False, show_clusters: bool | ig.VertexClustering = False, bipartite_layout: bool = False, sugiyama_layout: bool = False, circular_layout: bool = False, kamada_kawai_layout: bool = False, drl_layout: bool = False, node_opacity: float | None = None, edge_opacity: float | None = None, label_term_nodes: bool = False, label_doc_nodes: bool = False, label_nodes: bool = False, label_edges: bool = False, node_label_filter: Callable[[ig.Vertex], bool] | None = None, edge_label_filter: Callable[[ig.Edge], bool] | None = None, scale_nodes_by: str | None = None, **kwargs, ) -> ig.Plot: """ Plot the bipartite graph. Parameters ---------- color_clusters : bool or VertexClustering, optional Color nodes according to clusters detected by the Leiden algorithm (default: False). Alternately a clustering object generated by another community detection algorithm can be passed. show_clusters : bool or VertexClustering, optional Mark clusters detected by the Leiden algorithm (default: False). Alternately a clustering object generated by another community detection algorithm can be passed. bipartite_layout : bool, optional Use a bipartite graph layout (default: False; a weighted Fruchterman-Reingold layout is used unless another layout is specified). sugiyama_layout : bool, optional Use layered Sugiyama layout (default: False; a weighted Fruchterman-Reingold layout is used unless another layout is specified). circular_layout : bool, optional Use circular Reingold-Tilford layout (default: False; a weighted Fruchterman-Reingold layout is used unless another layout is specified). kamada_kawai_layout : bool, optional Use a layout created by the Kamada-Kawai algorithm (default: False; a weighted Fruchterman-Reingold layout is used unless another layout is specified). drl_layout : bool, optional Use the DrL layout, suitable for large networks (default: False; a weighted Fruchterman-Reingold layout is used unless another layout is specified). node_opacity : float, optional Opacity (between 0 and 1) to apply to nodes (default: no transparency). edge_opacity : float, optional Opacity (between 0 and 1) to apply to edges (default: no transparency). label_term_nodes : bool, optional Label term nodes (default: False). label_doc_nodes : bool, optional Label document nodes (default: False). label_nodes : bool, optional Label term and document nodes (default: False). label_edges : bool, optional Show edge weights in plot. node_label_filter : function, optional Function returning boolean value mapped to iterator of nodes to decide whether or not to suppress labels. edge_label_filter : function, optional Function returning boolean value mapped to iterator of edges to decide whether or not to suppress labels. scale_nodes_by : str, optional Name of centrality measure or node attribute to scale nodes by. Possible values: ``degree``, ``strength``, ``hits``, ``cohits``, ``birank`` or any node attribute (default: None). Other Parameters ---------------- target : str or file, optional File or path that the plot should be saved to (e.g., ``plot.png``). kwargs Additional arguments to pass to `igraph.drawing.plot`. Returns ------- `igraph.drawing.Plot` The plot can be directly displayed in a Jupyter notebook or saved as an image file. """ args = locals() del args["self"], args["kwargs"] kwargs.update(args) return self._plot(**kwargs)
@cached_property def hits(self) -> pd.Series: """HITS rank of nodes.""" return bipartite_rank(self, normalizer="HITS") @cached_property def cohits(self) -> pd.Series: """CoHITS rank of nodes.""" return bipartite_rank(self, normalizer="CoHITS") @cached_property def birank(self) -> pd.Series: """BiRank of nodes.""" return bipartite_rank(self, normalizer="BiRank") @cached_property def bipartite_cc(self) -> pd.Series: """ Calculate the unweighted bipartite clustering coefficient. Returns ------- `pandas.Series` The clustering cofficients indexed by node label. Notes ----- Adapted from the ``networkx`` implementation. References ---------- :cite:`Latapy2008` """ ccs = [] for node in self.nodes: cc: float = 0 fon = set(node.neighbors()) son = {nn for nbr in fon for nn in nbr.neighbors()} - {node} for nn in son: nnn = set(nn.neighbors()) cc += len(nnn & fon) / len(nnn | fon) if cc > 0: cc /= len(son) ccs.append(cc) return pd.Series(ccs, index=self.nodes["id"]) def _partition_graph(self, resolution: float, seed: int) -> ig.VertexClustering: part, part0, part1 = la.CPMVertexPartition.Bipartite( self.graph, resolution_parameter_01=resolution, weights="weight" ) opt = la.Optimiser() opt.set_rng_seed(seed) opt.optimise_partition_multiplex( [part, part0, part1], layer_weights=[1, -1, -1], n_iterations=-1 ) return part
for prop, desc in [ ("hits", "HITS rank"), ("cohits", "CoHITS rank"), ("birank", "BiRank"), ("bipartite_cc", "bipartite clustering coefficient"), ]: setattr(Textnet, f"top_{prop}", _make_top(prop, desc))
[docs] class ProjectedTextnet(TextnetBase): """ One-mode projection of a textnet. Created by calling `Textnet.project()` with the desired ``node_type``. Attributes ---------- graph : `igraph.Graph` Direct access to the igraph object. """ @cached_property def m(self) -> pd.DataFrame: """Weighted adjacency matrix.""" d = self.graph.get_adjacency().data m = np.array(d, dtype=float) for i in self.edges.indices: edge = self.graph.es[i] source, target = edge.source, edge.target m[source, target] = m[target, source] = edge["weight"] return pd.DataFrame(m, index=self.nodes["id"], columns=self.nodes["id"]) @cached_property def betweenness(self) -> pd.Series: """Weighted betweenness centrality.""" return pd.Series(self.graph.betweenness(weights="cost"), index=self.nodes["id"]) @cached_property def closeness(self) -> pd.Series: """Weighted closeness centrality.""" return pd.Series(self.graph.closeness(weights="cost"), index=self.nodes["id"]) @cached_property def harmonic(self) -> pd.Series: """Weighted harmonic centrality.""" return pd.Series( self.graph.harmonic_centrality(weights="cost"), index=self.nodes["id"] ) @cached_property def eigenvector_centrality(self) -> pd.Series: """Weighted eigenvector centrality.""" return pd.Series( self.graph.eigenvector_centrality(weights="weight"), index=self.nodes["id"] ) @cached_property def pagerank(self) -> pd.Series: """Weighted PageRank centrality.""" return pd.Series(self.graph.pagerank(weights="weight"), index=self.nodes["id"]) @property def spanning(self) -> pd.Series: """Textual spanning measure.""" if any(self.node_types): warn("Textual spanning is only defined for document nodes.") if not self.graph.is_connected(): warn( "Graph is disconnected. " "The textual spanning measure is not effective on disconnected graphs." ) a = self.m.to_numpy() return pd.Series(textual_spanning(a), index=self.nodes["id"])
[docs] def alpha_cut(self, alpha: float) -> ProjectedTextnet: """ Return graph backbone. Parameters ---------- alpha : float Threshold for edge elimination. Must be between 0 and 1. Edges with an alpha value above the specified threshold are removed. Returns ------- `ProjectedTextnet` New textnet sans pruned edges. """ if "alpha" not in self.graph.vertex_attributes(): self.graph.es["alpha"] = list(disparity_filter(self.graph)) pruned = self.graph.copy() pruned.delete_edges(pruned.es.select(alpha_ge=alpha)) return ProjectedTextnet(giant_component(pruned))
[docs] def plot(self, *, alpha: float | None = None, **kwargs) -> ig.Plot: """ Plot the projected graph. Parameters ---------- alpha : float, optional Threshold for edge elimination. Must be between 0 and 1. Edges with an alpha value above the specified threshold are removed. This is useful when plotting "hairball" graphs. scale_nodes_by : str, optional Name of centrality measure or node attribute to scale nodes by. Possible values: ``degree``, ``strength``, ``betweenness``, ``closeness``, ``eigenvector_centrality``, ``pagerank`` or any node attribute (default: None). Returns ------- `igraph.drawing.Plot` The plot can be directly displayed in a Jupyter notebook or saved as an image file. Other Parameters ---------------- target : str or file, optional File or path that the plot should be saved to (e.g., ``plot.png``). kwargs Additional arguments to pass to `igraph.drawing.plot`. """ if alpha is not None: to_plot = self.alpha_cut(alpha) else: to_plot = self return to_plot._plot(**kwargs)
def _partition_graph(self, resolution: float, seed: int) -> ig.VertexClustering: part = la.find_partition( self.graph, la.ModularityVertexPartition, weights="weight", n_iterations=-1, seed=seed, ) return part
for prop, desc in [ ("betweenness", "betweenness"), ("closeness", "closeness"), ("harmonic", "harmonic centrality"), ("pagerank", "PageRank centrality"), ("eigenvector_centrality", "eigenvector centrality"), ("spanning", "textual spanning"), ]: setattr(ProjectedTextnet, f"top_{prop}", _make_top(prop, desc)) ProjectedTextnet.top_ev = ProjectedTextnet.top_eigenvector_centrality # type: ignore def _matrix_from_tidy_text( tidy_text: TidyText | pd.DataFrame, min_docs: int ) -> BiadjacencyMatrix: count = tidy_text.groupby("term").count()["n"] tt = ( tidy_text.reset_index() .merge(count >= min_docs, on="term", how="left") .rename(columns={"n_y": "keep", "n_x": "n"}) ) m = ( tt[tt["keep"]] .groupby(["label", "term"]) .first()["term_weight"] .astype(pd.SparseDtype("float")) .unstack(fill_value=0) ) return BiadjacencyMatrix(m.astype("float64")) def _graph_from_matrix(m: BiadjacencyMatrix) -> ig.Graph: g = ig.Graph.Biadjacency(m.to_numpy().tolist(), directed=False, weighted=True) g.vs["id"] = np.append(m.index, m.columns).tolist() g.es["cost"] = [1 / pow(w, tn.params["tuning_parameter"]) for w in g.es["weight"]] g.vs["type"] = ["term" if t else "doc" for t in g.vs["type"]] return g
[docs] def disparity_filter(graph: ig.Graph) -> Iterator[float]: """ Compute significance scores of edge weights. Parameters ---------- graph : Graph The one-mode graph to compute the significance scores for. Yields ------ float Iterator of significance scores. Notes ----- Provided the package was installed properly, a compiled extension will be used for a significant speedup. References ---------- :cite:`Serrano2009` """ tqdm_args = dict(disable=not tn.params["progress_bar"] or None, unit="edges") cores = cpu_count() or 1 if cores > 1 and len(graph.es) >= cores: sig_ufunc = np.frompyfunc(_edge_significance, 1, 1) edge_chunks = np.array_split(pd.Series(list(graph.es)), cores) yield from pd.concat(thread_map(sig_ufunc, edge_chunks, **tqdm_args)) else: for edge in tqdm(graph.es, **tqdm_args): yield _edge_significance(edge)
def _edge_significance(edge: ig.Edge) -> float: source, target = edge.vertex_tuple degree_t = target.degree() degree_s = source.degree() sum_weights_s = source.strength(weights="weight") norm_weight_s = edge["weight"] / sum_weights_s sum_weights_t = target.strength(weights="weight") norm_weight_t = edge["weight"] / sum_weights_t integral_s = _disparity_filter_integral(norm_weight_s, degree_s) integral_t = _disparity_filter_integral(norm_weight_t, degree_t) return min(1 - (degree_s - 1) * integral_s[0], 1 - (degree_t - 1) * integral_t[0]) @memoize def _disparity_filter_integral(norm_weight: float, degree: int) -> float: return quad(integrand, 0, norm_weight, args=(degree))
[docs] def giant_component(g: ig.Graph) -> ig.Graph: """ Return the subgraph corresponding to the giant component. Parameters ---------- `igraph.Graph` The (possibly) disconnected graph. Returns ------- `igraph.Graph` The graph consisting of just the largest connected component. """ size = max(g.components().sizes()) pos = g.components().sizes().index(size) return g.subgraph(g.components()[pos])
[docs] def bipartite_rank( net: Textnet, normalizer: Literal["HITS", "CoHITS", "BGRM", "BiRank"], alpha: float = 0.85, beta: float = 0.85, max_iter: int = -1, tolerance: float = 1.0e-4, ) -> pd.Series: """ Calculate centralities of nodes in the bipartite network. Parameters ---------- normalizer : string The normalizer to use: ``HITS``, ``CoHITS``, ``BGRM``, or ``BiRank``. See reference for details. alpha : float, optional Damping factor for the rows and columns. beta : float, optional Damping factor for the rows and columns. max_iter : int, optional Maximum number of iterations to run before reaching convergence (default: -1, meaning iterate until the errors are within the specified tolerance). tolerance : float, optional Error tolerance when checking for convergence. Raises ------ ValueError If an invalid normalizer is specified. Returns ------- `pandas.Series` The BiRank for both sets of nodes indexed by node label. Notes ----- Adapted from the implementation by :cite:t:`Yang2020`. References ---------- :cite:`He2017` """ if normalizer not in ("HITS", "CoHITS", "BGRM", "BiRank"): raise ValueError(f"'{normalizer}' is not a valid normalization option.") W = net.m.to_numpy() Kd = np.array(W.sum(axis=1)).flatten() Kp = np.array(W.T.sum(axis=1)).flatten() Kd[np.where(Kd == 0)] += 1 Kp[np.where(Kp == 0)] += 1 Kd_ = np.diagflat(1 / Kd) Kp_ = np.diagflat(1 / Kp) if normalizer == "HITS": Sp = W.T Sd = W elif normalizer == "CoHITS": Sp = W.T.dot(Kd_) Sd = W.dot(Kp_) elif normalizer == "BGRM": Sp = Kp_.dot(W.T).dot(Kd_) Sd = Sp.T elif normalizer == "BiRank": Kd_bi = np.diagflat(1 / np.sqrt(Kd)) Kp_bi = np.diagflat(1 / np.sqrt(Kp)) Sp = Kp_bi.dot(W.T).dot(Kd_bi) Sd = Sp.T p0 = np.repeat(1 / Kp_.shape[0], Kp_.shape[0]) p_last = p0.copy() d0 = np.repeat(1 / Kd_.shape[0], Kd_.shape[0]) d_last = d0.copy() iter_count = 0 continue_iter = iter_count < max_iter or max_iter < 0 while continue_iter: p = alpha * (Sp.dot(d_last)) + (1 - alpha) * p0 d = beta * (Sd.dot(p_last)) + (1 - beta) * d0 if normalizer == "HITS": p = p / p.sum() d = d / d.sum() err_p = np.absolute(p - p_last).sum() err_d = np.absolute(d - d_last).sum() iter_count += 1 if err_p < tolerance and err_d < tolerance: continue_iter = False else: continue_iter = iter_count < max_iter or max_iter < 0 p_last = p d_last = d return pd.Series(np.append(d, p), index=net.nodes["id"])
[docs] def textual_spanning(m: np.ndarray, alpha: float = 1.0) -> np.ndarray: """ Calculate textual spanning of documents in a similarity matrix. Parameters ---------- m : array Document similarity matrix. alpha : float, optional Tuning parameter to change relevance of edge weights vs. document counts in calculating the spanning score (default: 1.0). Returns ------- `numpy.array` Each node's textual spanning measure. Notes ----- The textual spanning measure is not effective on disconnected graphs. References ---------- :cite:`Stoltz2019` """ np.fill_diagonal(m, 0) k = np.sum(m != 0, axis=1) den = k * (m.sum(axis=1) / k**alpha) ps = m.T / den[:, np.newaxis] eps = np.zeros_like(ps) eps[ps > 0] = ps[ps > 0] ** -1 ps2 = eps @ ps sp = (ps + ps2) ** 2 csp = sp.sum(axis=1) csp_norm = ((csp - csp.mean()) / csp.std(ddof=1)) * -1 return csp_norm
[docs] class BiadjacencyMatrix(LiteFrame): """Matrix relating documents to terms."""