Source code for xgi.communities.spectral

"""Community detection via clustering of Laplacian eigenvectors."""

import numpy as np
from scipy.sparse.linalg import eigsh

from ..exception import XGIError
from ..linalg.laplacian_matrix import normalized_hypergraph_laplacian

__all__ = [
    "spectral_clustering",
]


[docs]def spectral_clustering(H, k=2, max_iter=1_000, seed=None): """Computes a spectral clustering in :math:`k` partitions of the input hypergraph according to the heuristic presented in [1]. This is done by computing the normalized Laplacian of the input hypergraph and then applying :math:`k`-means clustering on the hypergraph vertices represented in the :math:`k`-dimensional Euclidian space generated by the eigenvectors corresponding to the :math:`k` smallest eigenvalues of the normalized Laplacian. Parameters ---------- H : Hypergraph Hypergraph k : int, optional Number of clusters to find, default 2. max_iter : int, optional. Maximum number of cluster updates to compute, default 1,000. seed : int, numpy.random.Generator, or None, optional The seed for the random number generator. By default, None. Returns ------- dict A dictionary mapping node ids to their clusters. Clusters begin at 0. Raises ------ XGIError If more groups are specified than nodes in the hypergraph. References ---------- .. [1] Zhou, D., Huang, J., & Schölkopf, B. (2006). Learning with Hypergraphs: Clustering, Classification, and Embedding Advances in Neural Information Processing Systems. """ if k > H.num_nodes: raise XGIError( "The number of desired clusters cannot exceed the number of nodes!" ) # Compute normalize Laplacian and its spectra L, rowdict = normalized_hypergraph_laplacian(H, index=True) v0 = np.random.default_rng(seed).random(L.shape[0]) if seed is not None else None evals, eigs = eigsh(L, k=k, which="SA", v0=v0) # Form metric space representation X = np.array(eigs) # Apply k-means clustering _clusters = _kmeans(X, k, max_iter, seed) # Remap to node ids clusters = {rowdict[id]: cluster for id, cluster in _clusters.items()} return clusters
def _kmeans(X, k, max_iter=1_000, seed=None): """Simple k-means clustering of vectors X. Uses Forgy method for selecting initial centroids. Parameters ---------- X : (n, k) array Vectors to cluster. k : int Number of clusters to find. max_iter : int, optional. Maximum number of cluster updates to compute, default 10,000. seed : int, numpy.random.Generator, or None, optional The seed for the random number generator. By default, None. Returns ------- dict A dictionary mapping node ids to their clusters. Clusters begin at 0. """ rng = np.random.default_rng(seed=seed) # Handle edge cases if k == 1: return {node_idx: 1 for node_idx in range(X.shape[0])} # Initialize stopping criterion num_cluster_changes = np.inf num_iterations = 0 # Instantiate random centers centroids = rng.choice(X, size=k, replace=False) # Instantiate random clusters previous_clusters = {node: rng.integers(0, k) for node in range(X.shape[0])} # Iterate main kmeans computation while (num_cluster_changes > 0) and (num_iterations < max_iter): # Find nearest centroid to each point clusters = dict() vectors = {cluster: [] for cluster in range(k)} for node, vector in enumerate(X): distances = [np.linalg.norm(vector - centroid) for centroid in centroids] closest_centroid = np.argmin(distances) clusters[node] = closest_centroid vectors[closest_centroid].append(vector) # Update centroids centroids = [np.mean(vectors[cluster_id]) for cluster_id in vectors] # Update convergence condition cluster_changes = { node: clusters[node] != previous_clusters[node] for node in range(X.shape[0]) } num_cluster_changes = len( list(filter(lambda diff: diff, cluster_changes.values())) ) num_iterations += 1 return clusters