Source code for easygraph.functions.centrality.katz_centrality
from easygraph.utils import *
import numpy as np
from easygraph.utils.decorators import *
__all__ = ["katz_centrality"]
[docs]
@not_implemented_for("multigraph")
@hybrid("cpp_katz_centrality")
def katz_centrality(G, alpha=0.1, beta=1.0, max_iter=1000, tol=1e-6, normalized=True):
r"""
Compute the Katz centrality for nodes in a graph.
Katz centrality computes the influence of a node based on the total number
of walks between nodes, attenuated by a factor of their length. It is
defined as the solution to the linear system:
.. math::
x = \alpha A x + \beta
where:
- \( A \) is the adjacency matrix of the graph,
- \( \alpha \) is a scalar attenuation factor,
- \( \beta \) is the bias vector (typically all ones),
- and \( x \) is the resulting centrality vector.
The algorithm runs an iterative fixed-point method until convergence.
Parameters
----------
G : easygraph.Graph
An EasyGraph graph instance. Must be simple (non-multigraph).
alpha : float, optional (default=0.1)
Attenuation factor, must be smaller than the reciprocal of the largest
eigenvalue of the adjacency matrix to ensure convergence.
beta : float or dict, optional (default=1.0)
Bias term. Can be a constant scalar applied to all nodes, or a dictionary
mapping node IDs to values.
max_iter : int, optional (default=1000)
Maximum number of iterations before the algorithm terminates.
tol : float, optional (default=1e-6)
Convergence tolerance. Iteration stops when the L1 norm of the difference
between successive iterations is below this threshold.
normalized : bool, optional (default=True)
If True, the result vector will be normalized to unit norm (L2).
Returns
-------
dict
A dictionary mapping node IDs to Katz centrality scores.
Raises
------
RuntimeError
If the algorithm fails to converge within `max_iter` iterations.
Examples
--------
>>> import easygraph as eg
>>> from easygraph import katz_centrality
>>> G = eg.Graph()
>>> G.add_edges_from([(0, 1), (1, 2), (2, 3)])
>>> katz_centrality(G, alpha=0.05)
{0: 0.370..., 1: 0.447..., 2: 0.447..., 3: 0.370...}
"""
# Create node ordering
nodes = list(G.nodes)
n = len(nodes)
node_to_index = {node: i for i, node in enumerate(nodes)}
index_to_node = {i: node for i, node in enumerate(nodes)}
# Build adjacency matrix
A = np.zeros((n, n), dtype=np.float64)
for u in G.nodes:
for v in G.adj[u]:
A[node_to_index[u], node_to_index[v]] = 1.0
# Initialize x and beta
x = np.ones(n, dtype=np.float64)
if isinstance(beta, dict):
b = np.array([beta.get(index_to_node[i], 1.0) for i in range(n)])
else:
b = np.ones(n, dtype=np.float64) * beta
# Iterative update using vectorized ops
for _ in range(max_iter):
x_new = alpha * A @ x + b
if np.linalg.norm(x_new - x, ord=1) < tol:
break
x = x_new
else:
raise RuntimeError(f"Katz centrality failed to converge in {max_iter} iterations")
if normalized:
norm = np.linalg.norm(x)
if norm > 0:
x /= norm
result = {index_to_node[i]: float(x[i]) for i in range(n)}
return result