from heapq import heappop
from heapq import heappush
from itertools import count
from math import isnan
from operator import itemgetter
from easygraph.utils.decorators import *
__all__ = [
"minimum_spanning_edges",
"maximum_spanning_edges",
"minimum_spanning_tree",
"maximum_spanning_tree",
]
def boruvka_mst_edges(G, minimum=True, weight="weight", data=True, ignore_nan=False):
"""Iterate over edges of a Borůvka's algorithm min/max spanning tree.
Parameters
----------
G : EasyGraph Graph
The edges of `G` must have distinct weights,
otherwise the edges may not form a tree.
minimum : bool (default: True)
Find the minimum (True) or maximum (False) spanning tree.
weight : string (default: 'weight')
The name of the edge attribute holding the edge weights.
data : bool (default: True)
Flag for whether to yield edge attribute dicts.
If True, yield edges `(u, v, d)`, where `d` is the attribute dict.
If False, yield edges `(u, v)`.
ignore_nan : bool (default: False)
If a NaN is found as an edge weight normally an exception is raised.
If `ignore_nan is True` then that edge is ignored instead.
"""
# Initialize a forest, assuming initially that it is the discrete
# partition of the nodes of the graph.
forest = UnionFind(G)
def best_edge(component):
"""Returns the optimum (minimum or maximum) edge on the edge
boundary of the given set of nodes.
A return value of ``None`` indicates an empty boundary.
"""
sign = 1 if minimum else -1
minwt = float("inf")
boundary = None
for e in edge_boundary(G, component, data=True):
wt = e[-1].get(weight, 1) * sign
if isnan(wt):
if ignore_nan:
continue
msg = f"NaN found as an edge weight. Edge {e}"
raise ValueError(msg)
if wt < minwt:
minwt = wt
boundary = e
return boundary
# Determine the optimum edge in the edge boundary of each component
# in the forest.
best_edges = (best_edge(component) for component in forest.to_sets())
best_edges = [edge for edge in best_edges if edge is not None]
# If each entry was ``None``, that means the graph was disconnected,
# so we are done generating the forest.
while best_edges:
# Determine the optimum edge in the edge boundary of each
# component in the forest.
#
# This must be a sequence, not an iterator. In this list, the
# same edge may appear twice, in different orientations (but
# that's okay, since a union operation will be called on the
# endpoints the first time it is seen, but not the second time).
#
# Any ``None`` indicates that the edge boundary for that
# component was empty, so that part of the forest has been
# completed.
#
# TODO This can be parallelized, both in the outer loop over
# each component in the forest and in the computation of the
# minimum. (Same goes for the identical lines outside the loop.)
best_edges = (best_edge(component) for component in forest.to_sets())
best_edges = [edge for edge in best_edges if edge is not None]
# Join trees in the forest using the best edges, and yield that
# edge, since it is part of the spanning tree.
#
# TODO This loop can be parallelized, to an extent (the union
# operation must be atomic).
for u, v, d in best_edges:
if forest[u] != forest[v]:
if data:
yield u, v, d
else:
yield u, v
forest.union(u, v)
@hybrid("cpp_kruskal_mst_edges")
def kruskal_mst_edges(G, minimum=True, weight="weight", data=True, ignore_nan=False):
"""Iterate over edges of a Kruskal's algorithm min/max spanning tree.
Parameters
----------
G : EasyGraph Graph
The graph holding the tree of interest.
minimum : bool (default: True)
Find the minimum (True) or maximum (False) spanning tree.
weight : string (default: 'weight')
The name of the edge attribute holding the edge weights.
data : bool (default: True)
Flag for whether to yield edge attribute dicts.
If True, yield edges `(u, v, d)`, where `d` is the attribute dict.
If False, yield edges `(u, v)`.
ignore_nan : bool (default: False)
If a NaN is found as an edge weight normally an exception is raised.
If `ignore_nan is True` then that edge is ignored instead.
"""
subtrees = UnionFind()
edges = []
for u, v, t in G.edges:
edges.append((u, v, t))
def filter_nan_edges(edges=edges, weight=weight):
sign = 1 if minimum else -1
for u, v, d in edges:
wt = d.get(weight, 1) * sign
if isnan(wt):
if ignore_nan:
continue
msg = f"NaN found as an edge weight. Edge {(u, v, d)}"
raise ValueError(msg)
yield wt, u, v, d
edges = sorted(filter_nan_edges(), key=itemgetter(0))
for wt, u, v, d in edges:
if subtrees[u] != subtrees[v]:
if data:
yield (u, v, d)
else:
yield (u, v)
subtrees.union(u, v)
@hybrid("cpp_prim_mst_edges")
def prim_mst_edges(G, minimum=True, weight="weight", data=True, ignore_nan=False):
"""Iterate over edges of Prim's algorithm min/max spanning tree.
Parameters
----------
G : EasyGraph Graph
The graph holding the tree of interest.
minimum : bool (default: True)
Find the minimum (True) or maximum (False) spanning tree.
weight : string (default: 'weight')
The name of the edge attribute holding the edge weights.
data : bool (default: True)
Flag for whether to yield edge attribute dicts.
If True, yield edges `(u, v, d)`, where `d` is the attribute dict.
If False, yield edges `(u, v)`.
ignore_nan : bool (default: False)
If a NaN is found as an edge weight normally an exception is raised.
If `ignore_nan is True` then that edge is ignored instead.
"""
push = heappush
pop = heappop
nodes = set(G)
c = count()
sign = 1 if minimum else -1
while nodes:
u = nodes.pop()
frontier = []
visited = {u}
for v, d in G.adj[u].items():
wt = d.get(weight, 1) * sign
if isnan(wt):
if ignore_nan:
continue
msg = f"NaN found as an edge weight. Edge {(u, v, d)}"
raise ValueError(msg)
push(frontier, (wt, next(c), u, v, d))
while frontier:
W, _, u, v, d = pop(frontier)
if v in visited or v not in nodes:
continue
if data:
yield u, v, d
else:
yield u, v
# update frontier
visited.add(v)
nodes.discard(v)
for w, d2 in G.adj[v].items():
if w in visited:
continue
new_weight = d2.get(weight, 1) * sign
push(frontier, (new_weight, next(c), v, w, d2))
ALGORITHMS = {
"boruvka": boruvka_mst_edges,
"borůvka": boruvka_mst_edges,
"kruskal": kruskal_mst_edges,
"prim": prim_mst_edges,
}
[docs]@not_implemented_for("multigraph")
@only_implemented_for_UnDirected_graph
def minimum_spanning_edges(
G, algorithm="kruskal", weight="weight", data=True, ignore_nan=False
):
"""Generate edges in a minimum spanning forest of an undirected
weighted graph.
A minimum spanning tree is a subgraph of the graph (a tree)
with the minimum sum of edge weights. A spanning forest is a
union of the spanning trees for each connected component of the graph.
Parameters
----------
G : undirected Graph
An undirected graph. If `G` is connected, then the algorithm finds a
spanning tree. Otherwise, a spanning forest is found.
algorithm : string
The algorithm to use when finding a minimum spanning tree. Valid
choices are 'kruskal', 'prim', or 'boruvka'. The default is 'kruskal'.
weight : string
Edge data key to use for weight (default 'weight').
data : bool, optional
If True yield the edge data along with the edge.
ignore_nan : bool (default: False)
If a NaN is found as an edge weight normally an exception is raised.
If `ignore_nan is True` then that edge is ignored instead.
Returns
-------
edges : iterator
An iterator over edges in a maximum spanning tree of `G`.
Edges connecting nodes `u` and `v` are represented as tuples:
`(u, v, k, d)` or `(u, v, k)` or `(u, v, d)` or `(u, v)`
Examples
--------
>>> from easygraph.functions.basic import mst
Find minimum spanning edges by Kruskal's algorithm
>>> G.add_edge(0, 3, weight=2)
>>> mst = mst.minimum_spanning_edges(G, algorithm="kruskal", data=False)
>>> edgelist = list(mst)
>>> sorted(sorted(e) for e in edgelist)
[[0, 1], [1, 2], [2, 3]]
Find minimum spanning edges by Prim's algorithm
>>> G.add_edge(0, 3, weight=2)
>>> mst = mst.minimum_spanning_edges(G, algorithm="prim", data=False)
>>> edgelist = list(mst)
>>> sorted(sorted(e) for e in edgelist)
[[0, 1], [1, 2], [2, 3]]
Notes
-----
For Borůvka's algorithm, each edge must have a weight attribute, and
each edge weight must be distinct.
For the other algorithms, if the graph edges do not have a weight
attribute a default weight of 1 will be used.
Modified code from David Eppstein, April 2006
http://www.ics.uci.edu/~eppstein/PADS/
"""
try:
algo = ALGORITHMS[algorithm]
except KeyError as e:
msg = f"{algorithm} is not a valid choice for an algorithm."
raise ValueError(msg) from e
return algo(G, minimum=True, weight=weight, data=data, ignore_nan=ignore_nan)
[docs]@not_implemented_for("multigraph")
@only_implemented_for_UnDirected_graph
def maximum_spanning_edges(
G, algorithm="kruskal", weight="weight", data=True, ignore_nan=False
):
"""Generate edges in a maximum spanning forest of an undirected
weighted graph.
A maximum spanning tree is a subgraph of the graph (a tree)
with the maximum possible sum of edge weights. A spanning forest is a
union of the spanning trees for each connected component of the graph.
Parameters
----------
G : undirected Graph
An undirected graph. If `G` is connected, then the algorithm finds a
spanning tree. Otherwise, a spanning forest is found.
algorithm : string
The algorithm to use when finding a maximum spanning tree. Valid
choices are 'kruskal', 'prim', or 'boruvka'. The default is 'kruskal'.
weight : string
Edge data key to use for weight (default 'weight').
data : bool, optional
If True yield the edge data along with the edge.
ignore_nan : bool (default: False)
If a NaN is found as an edge weight normally an exception is raised.
If `ignore_nan is True` then that edge is ignored instead.
Returns
-------
edges : iterator
An iterator over edges in a maximum spanning tree of `G`.
Edges connecting nodes `u` and `v` are represented as tuples:
`(u, v, k, d)` or `(u, v, k)` or `(u, v, d)` or `(u, v)`
Examples
--------
>>> from easygraph.functions.basic import mst
Find maximum spanning edges by Kruskal's algorithm
>>> G.add_edge(0, 3, weight=2)
>>> mst = mst.maximum_spanning_edges(G, algorithm="kruskal", data=False)
>>> edgelist = list(mst)
>>> sorted(sorted(e) for e in edgelist)
[[0, 1], [0, 3], [1, 2]]
Find maximum spanning edges by Prim's algorithm
>>> G.add_edge(0, 3, weight=2) # assign weight 2 to edge 0-3
>>> mst = mst.maximum_spanning_edges(G, algorithm="prim", data=False)
>>> edgelist = list(mst)
>>> sorted(sorted(e) for e in edgelist)
[[0, 1], [0, 3], [2, 3]]
Notes
-----
For Borůvka's algorithm, each edge must have a weight attribute, and
each edge weight must be distinct.
For the other algorithms, if the graph edges do not have a weight
attribute a default weight of 1 will be used.
Modified code from David Eppstein, April 2006
http://www.ics.uci.edu/~eppstein/PADS/
"""
try:
algo = ALGORITHMS[algorithm]
except KeyError as e:
msg = f"{algorithm} is not a valid choice for an algorithm."
raise ValueError(msg) from e
return algo(G, minimum=False, weight=weight, data=data, ignore_nan=ignore_nan)
[docs]@not_implemented_for("multigraph")
def minimum_spanning_tree(G, weight="weight", algorithm="kruskal", ignore_nan=False):
"""Returns a minimum spanning tree or forest on an undirected graph `G`.
Parameters
----------
G : undirected graph
An undirected graph. If `G` is connected, then the algorithm finds a
spanning tree. Otherwise, a spanning forest is found.
weight : str
Data key to use for edge weights.
algorithm : string
The algorithm to use when finding a minimum spanning tree. Valid
choices are 'kruskal', 'prim', or 'boruvka'. The default is
'kruskal'.
ignore_nan : bool (default: False)
If a NaN is found as an edge weight normally an exception is raised.
If `ignore_nan is True` then that edge is ignored instead.
Returns
-------
G : EasyGraph Graph
A minimum spanning tree or forest.
Examples
--------
>>> G.add_edge(0, 3, weight=2)
>>> T = eg.minimum_spanning_tree(G)
>>> sorted(T.edges(data=True))
[(0, 1, {}), (1, 2, {}), (2, 3, {})]
Notes
-----
For Borůvka's algorithm, each edge must have a weight attribute, and
each edge weight must be distinct.
For the other algorithms, if the graph edges do not have a weight
attribute a default weight of 1 will be used.
Isolated nodes with self-loops are in the tree as edgeless isolated nodes.
"""
edges = list(
minimum_spanning_edges(G, algorithm, weight, data=True, ignore_nan=ignore_nan)
)
T = G.__class__() # Same graph class as G
for i in G.nodes:
T.add_node(i)
for i in edges:
(u, v, t) = i
T.add_edge(u, v, **t)
return T
[docs]@not_implemented_for("multigraph")
def maximum_spanning_tree(G, weight="weight", algorithm="kruskal", ignore_nan=False):
"""Returns a maximum spanning tree or forest on an undirected graph `G`.
Parameters
----------
G : undirected graph
An undirected graph. If `G` is connected, then the algorithm finds a
spanning tree. Otherwise, a spanning forest is found.
weight : str
Data key to use for edge weights.
algorithm : string
The algorithm to use when finding a maximum spanning tree. Valid
choices are 'kruskal', 'prim', or 'boruvka'. The default is
'kruskal'.
ignore_nan : bool (default: False)
If a NaN is found as an edge weight normally an exception is raised.
If `ignore_nan is True` then that edge is ignored instead.
Returns
-------
G : EasyGraph Graph
A maximum spanning tree or forest.
Examples
--------
>>> G.add_edge(0, 3, weight=2)
>>> T = eg.maximum_spanning_tree(G)
>>> sorted(T.edges(data=True))
[(0, 1, {}), (0, 3, {'weight': 2}), (1, 2, {})]
Notes
-----
For Borůvka's algorithm, each edge must have a weight attribute, and
each edge weight must be distinct.
For the other algorithms, if the graph edges do not have a weight
attribute a default weight of 1 will be used.
There may be more than one tree with the same minimum or maximum weight.
See :mod:`easygraph.tree.recognition` for more detailed definitions.
Isolated nodes with self-loops are in the tree as edgeless isolated nodes.
"""
edges = list(
maximum_spanning_edges(G, algorithm, weight, data=True, ignore_nan=ignore_nan)
)
T = G.__class__() # Same graph class as G
for i in G.nodes:
T.add_node(i)
for i in edges:
(u, v, t) = i
T.add_edge(u, v, **t)
return T
def edge_boundary(G, nbunch1, nbunch2=None, data=False, default=None):
"""Returns the edge boundary of `nbunch1`.
The *edge boundary* of a set *S* with respect to a set *T* is the
set of edges (*u*, *v*) such that *u* is in *S* and *v* is in *T*.
If *T* is not specified, it is assumed to be the set of all nodes
not in *S*.
Parameters
----------
G : EasyGraph graph
nbunch1 : iterable
Iterable of nodes in the graph representing the set of nodes
whose edge boundary will be returned. (This is the set *S* from
the definition above.)
nbunch2 : iterable
Iterable of nodes representing the target (or "exterior") set of
nodes. (This is the set *T* from the definition above.) If not
specified, this is assumed to be the set of all nodes in `G`
not in `nbunch1`.
data : bool or object
This parameter has the same meaning as in
:meth:`MultiGraph.edges`.
default : object
This parameter has the same meaning as in
:meth:`MultiGraph.edges`.
Returns
-------
iterator
An iterator over the edges in the boundary of `nbunch1` with
respect to `nbunch2`. If `keys`, `data`, or `default`
are specified and `G` is a multigraph, then edges are returned
with keys and/or data, as in :meth:`MultiGraph.edges`.
Notes
-----
Any element of `nbunch` that is not in the graph `G` will be
ignored.
`nbunch1` and `nbunch2` are usually meant to be disjoint, but in
the interest of speed and generality, that is not required here.
"""
nset1 = {v for v in G if v in nbunch1}
# Here we create an iterator over edges incident to nodes in the set
# `nset1`. The `Graph.edges()` method does not provide a guarantee
# on the orientation of the edges, so our algorithm below must
# handle the case in which exactly one orientation, either (u, v) or
# (v, u), appears in this iterable.
edges = G.edges(nset1, data=data, default=default)
# If `nbunch2` is not provided, then it is assumed to be the set
# complement of `nbunch1`. For the sake of efficiency, this is
# implemented by using the `not in` operator, instead of by creating
# an additional set and using the `in` operator.
if nbunch2 is None:
return (e for e in edges if (e[0] in nset1) ^ (e[1] in nset1))
nset2 = set(nbunch2)
return (
e
for e in edges
if (e[0] in nset1 and e[1] in nset2) or (e[1] in nset1 and e[0] in nset2)
)
"""
Union-find data structure.
"""
class UnionFind:
"""Union-find data structure.
Each unionFind instance X maintains a family of disjoint sets of
hashable objects, supporting the following two methods:
- X[item] returns a name for the set containing the given item.
Each set is named by an arbitrarily-chosen one of its members; as
long as the set remains unchanged it will keep the same name. If
the item is not yet part of a set in X, a new singleton set is
created for it.
- X.union(item1, item2, ...) merges the sets containing each item
into a single larger set. If any item is not yet part of a set
in X, it is added to X as one of the members of the merged set.
Union-find data structure. Based on Josiah Carlson's code,
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/215912
with significant additional changes by D. Eppstein.
http://www.ics.uci.edu/~eppstein/PADS/UnionFind.py
"""
def __init__(self, elements=None):
"""Create a new empty union-find structure.
If *elements* is an iterable, this structure will be initialized
with the discrete partition on the given set of elements.
"""
if elements is None:
elements = ()
self.parents = {}
self.weights = {}
for x in elements:
self.weights[x] = 1
self.parents[x] = x
def __getitem__(self, object):
"""Find and return the name of the set containing the object."""
# check for previously unknown object
if object not in self.parents:
self.parents[object] = object
self.weights[object] = 1
return object
# find basic of objects leading to the root
path = [object]
root = self.parents[object]
while root != path[-1]:
path.append(root)
root = self.parents[root]
# compress the basic and return
for ancestor in path:
self.parents[ancestor] = root
return root
def __iter__(self):
"""Iterate through all items ever found or unioned by this structure."""
return iter(self.parents)
def to_sets(self):
"""Iterates over the sets stored in this structure.
For example::
>>> partition = UnionFind("xyz")
>>> sorted(map(sorted, partition.to_sets()))
[['x'], ['y'], ['z']]
>>> partition.union("x", "y")
>>> sorted(map(sorted, partition.to_sets()))
[['x', 'y'], ['z']]
"""
# Ensure fully pruned paths
def groups(parents: dict):
sets = {}
for v, k in parents.items():
if k not in sets:
sets[k] = set()
sets[k].add(v)
return sets
for x in self.parents.keys():
_ = self[x] # Evaluated for side-effect only
yield from groups(self.parents).values()
def union(self, *objects):
"""Find the sets containing the objects and merge them all."""
# Find the heaviest root according to its weight.
roots = iter(sorted({self[x] for x in objects}, key=lambda r: self.weights[r]))
try:
root = next(roots)
except StopIteration:
return
for r in roots:
self.weights[root] += self.weights[r]
self.parents[r] = root