from easygraph.utils import *
from easygraph.utils.decorators import *
__all__ = [
"betweenness_centrality",
]
def betweenness_centrality_parallel(nodes, G, path_length, accumulate):
betweenness = {node: 0.0 for node in G}
for node in nodes:
S, P, sigma = path_length(G, source=node)
betweenness = accumulate(betweenness, S, P, sigma, node)
return betweenness
[docs]@not_implemented_for("multigraph")
@hybrid("cpp_betweenness_centrality")
def betweenness_centrality(
G, weight=None, sources=None, normalized=True, endpoints=False, n_workers=None
):
r"""Compute the shortest-basic betweenness centrality for nodes.
.. math::
c_B(v) = \sum_{s,t \in V} \frac{\sigma(s, t|v)}{\sigma(s, t)}
where V is the set of nodes,
.. math::
\sigma(s, t)
is the number of shortest (s, t)-paths, and
.. math::
\sigma(s, t|v)
is the number of those paths passing through some node v other than s, t.
.. math::
If\ s\ =\ t,\ \sigma(s, t) = 1, and\ if\ v \in {s, t}, \sigma(s, t|v) = 0 [2]_.
Parameters
----------
G : graph
A easygraph graph.
weight : None or string, optional (default=None)
If None, all edge weights are considered equal.
Otherwise holds the name of the edge attribute used as weight.
sources : None or nodes list, optional (default=None)
If None, all nodes are considered.
Otherwise,the set of source vertices to consider when calculating shortest paths.
normalized : bool, optional
If True the betweenness values are normalized by `2/((n-1)(n-2))`
for graphs, and `1/((n-1)(n-2))` for directed graphs where `n`
is the number of nodes in G.
endpoints : bool, optional
If True include the endpoints in the shortest basic counts.
Returns
-------
nodes : dictionary
Dictionary of nodes with betweenness centrality as the value.
>>> betweenness_centrality(G,weight="weight")
"""
import functools
if weight is not None:
path_length = functools.partial(_single_source_dijkstra_path, weight=weight)
else:
path_length = functools.partial(_single_source_bfs_path)
if endpoints:
accumulate = functools.partial(_accumulate_endpoints)
else:
accumulate = functools.partial(_accumulate_basic)
if sources is not None:
nodes = sources
else:
nodes = G.nodes
betweenness = dict.fromkeys(G, 0.0)
if n_workers is not None:
# use the parallel version for large graph
import random
from functools import partial
from multiprocessing import Pool
nodes = list(nodes)
random.shuffle(nodes)
if len(nodes) > n_workers * 30000:
nodes = split_len(nodes, step=30000)
else:
nodes = split(nodes, n_workers)
local_function = partial(
betweenness_centrality_parallel,
G=G,
path_length=path_length,
accumulate=accumulate,
)
with Pool(n_workers) as p:
ret = p.imap(local_function, nodes)
for res in ret:
for key in res:
betweenness[key] += res[key]
else:
# use np-parallel version for small graph
for node in nodes:
S, P, sigma = path_length(G, source=node)
betweenness = accumulate(betweenness, S, P, sigma, node)
betweenness = _rescale(
betweenness,
len(G),
normalized=normalized,
directed=G.is_directed(),
endpoints=endpoints,
)
return list(betweenness.values())
def _rescale(betweenness, n, normalized, directed=False, endpoints=False):
if normalized:
if endpoints:
if n < 2:
scale = None # no normalization
else:
# Scale factor should include endpoint nodes
scale = 1 / (n * (n - 1))
elif n <= 2:
scale = None # no normalization b=0 for all nodes
else:
scale = 1 / ((n - 1) * (n - 2))
else: # rescale by 2 for undirected graphs
if not directed:
scale = 0.5
else:
scale = None
if scale is not None:
for v in betweenness:
betweenness[v] *= scale
return betweenness
def _single_source_bfs_path(G, source):
S = []
P = {v: [] for v in G}
sigma = dict.fromkeys(G, 0.0)
D = {}
sigma[source] = 1.0
D[source] = 0
Q = [source]
adj = G.adj
while Q:
v = Q.pop(0)
S.append(v)
Dv = D[v]
sigmav = sigma[v]
for w in adj[v]:
if w not in D:
Q.append(w)
D[w] = Dv + 1
if D[w] == Dv + 1:
sigma[w] += sigmav
P[w].append(v)
return S, P, sigma
def _single_source_dijkstra_path(G, source, weight="weight"):
from heapq import heappop
from heapq import heappush
push = heappush
pop = heappop
S = []
P = {v: [] for v in G}
sigma = dict.fromkeys(G, 0.0)
D = {}
sigma[source] = 1.0
seen = {source: 0}
Q = []
from itertools import count
c = count()
adj = G.adj
push(Q, (0, next(c), source, source))
while Q:
(dist, _, pred, v) = pop(Q)
if v in D:
continue
sigma[v] += sigma[pred]
S.append(v)
D[v] = dist
for w in adj[v]:
vw_dist = dist + adj[v][w].get(weight, 1)
if w not in D and (w not in seen or vw_dist < seen[w]):
seen[w] = vw_dist
push(Q, (vw_dist, next(c), v, w))
sigma[w] = 0.0
P[w] = [v]
elif vw_dist == seen[w]: # handle equal paths
sigma[w] += sigma[v]
P[w].append(v)
return S, P, sigma
def _accumulate_endpoints(betweenness, S, P, sigma, s):
betweenness[s] += len(S) - 1
delta = dict.fromkeys(S, 0)
while S:
w = S.pop()
coeff = (1 + delta[w]) / sigma[w]
for v in P[w]:
delta[v] += sigma[v] * coeff
if w != s:
betweenness[w] += delta[w] + 1
return betweenness
def _accumulate_basic(betweenness, S, P, sigma, s):
delta = dict.fromkeys(S, 0)
while S:
w = S.pop()
coeff = (1 + delta[w]) / sigma[w]
for v in P[w]:
delta[v] += sigma[v] * coeff
if w != s:
betweenness[w] += delta[w]
return betweenness