Source code for easygraph.functions.path.mst

from heapq import heappop
from heapq import heappush
from itertools import count
from math import isnan
from operator import itemgetter

from easygraph.utils.decorators import *


__all__ = [
    "minimum_spanning_edges",
    "maximum_spanning_edges",
    "minimum_spanning_tree",
    "maximum_spanning_tree",
]


def boruvka_mst_edges(G, minimum=True, weight="weight", data=True, ignore_nan=False):
    """Iterate over edges of a Borůvka's algorithm min/max spanning tree.

    Parameters
    ----------
    G : EasyGraph Graph
        The edges of `G` must have distinct weights,
        otherwise the edges may not form a tree.

    minimum : bool (default: True)
        Find the minimum (True) or maximum (False) spanning tree.

    weight : string (default: 'weight')
        The name of the edge attribute holding the edge weights.

    data : bool (default: True)
        Flag for whether to yield edge attribute dicts.
        If True, yield edges `(u, v, d)`, where `d` is the attribute dict.
        If False, yield edges `(u, v)`.

    ignore_nan : bool (default: False)
        If a NaN is found as an edge weight normally an exception is raised.
        If `ignore_nan is True` then that edge is ignored instead.

    """
    # Initialize a forest, assuming initially that it is the discrete
    # partition of the nodes of the graph.
    forest = UnionFind(G)

    def best_edge(component):
        """Returns the optimum (minimum or maximum) edge on the edge
        boundary of the given set of nodes.

        A return value of ``None`` indicates an empty boundary.

        """
        sign = 1 if minimum else -1
        minwt = float("inf")
        boundary = None
        for e in edge_boundary(G, component, data=True):
            wt = e[-1].get(weight, 1) * sign
            if isnan(wt):
                if ignore_nan:
                    continue
                msg = f"NaN found as an edge weight. Edge {e}"
                raise ValueError(msg)
            if wt < minwt:
                minwt = wt
                boundary = e
        return boundary

    # Determine the optimum edge in the edge boundary of each component
    # in the forest.
    best_edges = (best_edge(component) for component in forest.to_sets())
    best_edges = [edge for edge in best_edges if edge is not None]
    # If each entry was ``None``, that means the graph was disconnected,
    # so we are done generating the forest.
    while best_edges:
        # Determine the optimum edge in the edge boundary of each
        # component in the forest.
        #
        # This must be a sequence, not an iterator. In this list, the
        # same edge may appear twice, in different orientations (but
        # that's okay, since a union operation will be called on the
        # endpoints the first time it is seen, but not the second time).
        #
        # Any ``None`` indicates that the edge boundary for that
        # component was empty, so that part of the forest has been
        # completed.
        #
        # TODO This can be parallelized, both in the outer loop over
        # each component in the forest and in the computation of the
        # minimum. (Same goes for the identical lines outside the loop.)
        best_edges = (best_edge(component) for component in forest.to_sets())
        best_edges = [edge for edge in best_edges if edge is not None]
        # Join trees in the forest using the best edges, and yield that
        # edge, since it is part of the spanning tree.
        #
        # TODO This loop can be parallelized, to an extent (the union
        # operation must be atomic).
        for u, v, d in best_edges:
            if forest[u] != forest[v]:
                if data:
                    yield u, v, d
                else:
                    yield u, v
                forest.union(u, v)


@hybrid("cpp_kruskal_mst_edges")
def kruskal_mst_edges(G, minimum=True, weight="weight", data=True, ignore_nan=False):
    """Iterate over edges of a Kruskal's algorithm min/max spanning tree.

    Parameters
    ----------
    G : EasyGraph Graph
        The graph holding the tree of interest.

    minimum : bool (default: True)
        Find the minimum (True) or maximum (False) spanning tree.

    weight : string (default: 'weight')
        The name of the edge attribute holding the edge weights.

    data : bool (default: True)
        Flag for whether to yield edge attribute dicts.
        If True, yield edges `(u, v, d)`, where `d` is the attribute dict.
        If False, yield edges `(u, v)`.

    ignore_nan : bool (default: False)
        If a NaN is found as an edge weight normally an exception is raised.
        If `ignore_nan is True` then that edge is ignored instead.

    """
    subtrees = UnionFind()
    edges = []
    for u, v, t in G.edges:
        edges.append((u, v, t))

    def filter_nan_edges(edges=edges, weight=weight):
        sign = 1 if minimum else -1
        for u, v, d in edges:
            wt = d.get(weight, 1) * sign
            if isnan(wt):
                if ignore_nan:
                    continue
                msg = f"NaN found as an edge weight. Edge {(u, v, d)}"
                raise ValueError(msg)
            yield wt, u, v, d

    edges = sorted(filter_nan_edges(), key=itemgetter(0))
    for wt, u, v, d in edges:
        if subtrees[u] != subtrees[v]:
            if data:
                yield (u, v, d)
            else:
                yield (u, v)
            subtrees.union(u, v)


@hybrid("cpp_prim_mst_edges")
def prim_mst_edges(G, minimum=True, weight="weight", data=True, ignore_nan=False):
    """Iterate over edges of Prim's algorithm min/max spanning tree.

    Parameters
    ----------
    G : EasyGraph Graph
        The graph holding the tree of interest.

    minimum : bool (default: True)
        Find the minimum (True) or maximum (False) spanning tree.

    weight : string (default: 'weight')
        The name of the edge attribute holding the edge weights.

    data : bool (default: True)
        Flag for whether to yield edge attribute dicts.
        If True, yield edges `(u, v, d)`, where `d` is the attribute dict.
        If False, yield edges `(u, v)`.

    ignore_nan : bool (default: False)
        If a NaN is found as an edge weight normally an exception is raised.
        If `ignore_nan is True` then that edge is ignored instead.

    """
    push = heappush
    pop = heappop

    nodes = set(G)
    c = count()

    sign = 1 if minimum else -1

    while nodes:
        u = nodes.pop()
        frontier = []
        visited = {u}
        for v, d in G.adj[u].items():
            wt = d.get(weight, 1) * sign
            if isnan(wt):
                if ignore_nan:
                    continue
                msg = f"NaN found as an edge weight. Edge {(u, v, d)}"
                raise ValueError(msg)
            push(frontier, (wt, next(c), u, v, d))
        while frontier:
            W, _, u, v, d = pop(frontier)
            if v in visited or v not in nodes:
                continue
            if data:
                yield u, v, d
            else:
                yield u, v
            # update frontier
            visited.add(v)
            nodes.discard(v)
            for w, d2 in G.adj[v].items():
                if w in visited:
                    continue
                new_weight = d2.get(weight, 1) * sign
                push(frontier, (new_weight, next(c), v, w, d2))


ALGORITHMS = {
    "boruvka": boruvka_mst_edges,
    "borůvka": boruvka_mst_edges,
    "kruskal": kruskal_mst_edges,
    "prim": prim_mst_edges,
}


[docs]@not_implemented_for("multigraph") @only_implemented_for_UnDirected_graph def minimum_spanning_edges( G, algorithm="kruskal", weight="weight", data=True, ignore_nan=False ): """Generate edges in a minimum spanning forest of an undirected weighted graph. A minimum spanning tree is a subgraph of the graph (a tree) with the minimum sum of edge weights. A spanning forest is a union of the spanning trees for each connected component of the graph. Parameters ---------- G : undirected Graph An undirected graph. If `G` is connected, then the algorithm finds a spanning tree. Otherwise, a spanning forest is found. algorithm : string The algorithm to use when finding a minimum spanning tree. Valid choices are 'kruskal', 'prim', or 'boruvka'. The default is 'kruskal'. weight : string Edge data key to use for weight (default 'weight'). data : bool, optional If True yield the edge data along with the edge. ignore_nan : bool (default: False) If a NaN is found as an edge weight normally an exception is raised. If `ignore_nan is True` then that edge is ignored instead. Returns ------- edges : iterator An iterator over edges in a maximum spanning tree of `G`. Edges connecting nodes `u` and `v` are represented as tuples: `(u, v, k, d)` or `(u, v, k)` or `(u, v, d)` or `(u, v)` Examples -------- >>> from easygraph.functions.basic import mst Find minimum spanning edges by Kruskal's algorithm >>> G.add_edge(0, 3, weight=2) >>> mst = mst.minimum_spanning_edges(G, algorithm="kruskal", data=False) >>> edgelist = list(mst) >>> sorted(sorted(e) for e in edgelist) [[0, 1], [1, 2], [2, 3]] Find minimum spanning edges by Prim's algorithm >>> G.add_edge(0, 3, weight=2) >>> mst = mst.minimum_spanning_edges(G, algorithm="prim", data=False) >>> edgelist = list(mst) >>> sorted(sorted(e) for e in edgelist) [[0, 1], [1, 2], [2, 3]] Notes ----- For Borůvka's algorithm, each edge must have a weight attribute, and each edge weight must be distinct. For the other algorithms, if the graph edges do not have a weight attribute a default weight of 1 will be used. Modified code from David Eppstein, April 2006 http://www.ics.uci.edu/~eppstein/PADS/ """ try: algo = ALGORITHMS[algorithm] except KeyError as e: msg = f"{algorithm} is not a valid choice for an algorithm." raise ValueError(msg) from e return algo(G, minimum=True, weight=weight, data=data, ignore_nan=ignore_nan)
[docs]@not_implemented_for("multigraph") @only_implemented_for_UnDirected_graph def maximum_spanning_edges( G, algorithm="kruskal", weight="weight", data=True, ignore_nan=False ): """Generate edges in a maximum spanning forest of an undirected weighted graph. A maximum spanning tree is a subgraph of the graph (a tree) with the maximum possible sum of edge weights. A spanning forest is a union of the spanning trees for each connected component of the graph. Parameters ---------- G : undirected Graph An undirected graph. If `G` is connected, then the algorithm finds a spanning tree. Otherwise, a spanning forest is found. algorithm : string The algorithm to use when finding a maximum spanning tree. Valid choices are 'kruskal', 'prim', or 'boruvka'. The default is 'kruskal'. weight : string Edge data key to use for weight (default 'weight'). data : bool, optional If True yield the edge data along with the edge. ignore_nan : bool (default: False) If a NaN is found as an edge weight normally an exception is raised. If `ignore_nan is True` then that edge is ignored instead. Returns ------- edges : iterator An iterator over edges in a maximum spanning tree of `G`. Edges connecting nodes `u` and `v` are represented as tuples: `(u, v, k, d)` or `(u, v, k)` or `(u, v, d)` or `(u, v)` Examples -------- >>> from easygraph.functions.basic import mst Find maximum spanning edges by Kruskal's algorithm >>> G.add_edge(0, 3, weight=2) >>> mst = mst.maximum_spanning_edges(G, algorithm="kruskal", data=False) >>> edgelist = list(mst) >>> sorted(sorted(e) for e in edgelist) [[0, 1], [0, 3], [1, 2]] Find maximum spanning edges by Prim's algorithm >>> G.add_edge(0, 3, weight=2) # assign weight 2 to edge 0-3 >>> mst = mst.maximum_spanning_edges(G, algorithm="prim", data=False) >>> edgelist = list(mst) >>> sorted(sorted(e) for e in edgelist) [[0, 1], [0, 3], [2, 3]] Notes ----- For Borůvka's algorithm, each edge must have a weight attribute, and each edge weight must be distinct. For the other algorithms, if the graph edges do not have a weight attribute a default weight of 1 will be used. Modified code from David Eppstein, April 2006 http://www.ics.uci.edu/~eppstein/PADS/ """ try: algo = ALGORITHMS[algorithm] except KeyError as e: msg = f"{algorithm} is not a valid choice for an algorithm." raise ValueError(msg) from e return algo(G, minimum=False, weight=weight, data=data, ignore_nan=ignore_nan)
[docs]@not_implemented_for("multigraph") def minimum_spanning_tree(G, weight="weight", algorithm="kruskal", ignore_nan=False): """Returns a minimum spanning tree or forest on an undirected graph `G`. Parameters ---------- G : undirected graph An undirected graph. If `G` is connected, then the algorithm finds a spanning tree. Otherwise, a spanning forest is found. weight : str Data key to use for edge weights. algorithm : string The algorithm to use when finding a minimum spanning tree. Valid choices are 'kruskal', 'prim', or 'boruvka'. The default is 'kruskal'. ignore_nan : bool (default: False) If a NaN is found as an edge weight normally an exception is raised. If `ignore_nan is True` then that edge is ignored instead. Returns ------- G : EasyGraph Graph A minimum spanning tree or forest. Examples -------- >>> G.add_edge(0, 3, weight=2) >>> T = eg.minimum_spanning_tree(G) >>> sorted(T.edges(data=True)) [(0, 1, {}), (1, 2, {}), (2, 3, {})] Notes ----- For Borůvka's algorithm, each edge must have a weight attribute, and each edge weight must be distinct. For the other algorithms, if the graph edges do not have a weight attribute a default weight of 1 will be used. Isolated nodes with self-loops are in the tree as edgeless isolated nodes. """ edges = list( minimum_spanning_edges(G, algorithm, weight, data=True, ignore_nan=ignore_nan) ) T = G.__class__() # Same graph class as G for i in G.nodes: T.add_node(i) for i in edges: (u, v, t) = i T.add_edge(u, v, **t) return T
[docs]@not_implemented_for("multigraph") def maximum_spanning_tree(G, weight="weight", algorithm="kruskal", ignore_nan=False): """Returns a maximum spanning tree or forest on an undirected graph `G`. Parameters ---------- G : undirected graph An undirected graph. If `G` is connected, then the algorithm finds a spanning tree. Otherwise, a spanning forest is found. weight : str Data key to use for edge weights. algorithm : string The algorithm to use when finding a maximum spanning tree. Valid choices are 'kruskal', 'prim', or 'boruvka'. The default is 'kruskal'. ignore_nan : bool (default: False) If a NaN is found as an edge weight normally an exception is raised. If `ignore_nan is True` then that edge is ignored instead. Returns ------- G : EasyGraph Graph A maximum spanning tree or forest. Examples -------- >>> G.add_edge(0, 3, weight=2) >>> T = eg.maximum_spanning_tree(G) >>> sorted(T.edges(data=True)) [(0, 1, {}), (0, 3, {'weight': 2}), (1, 2, {})] Notes ----- For Borůvka's algorithm, each edge must have a weight attribute, and each edge weight must be distinct. For the other algorithms, if the graph edges do not have a weight attribute a default weight of 1 will be used. There may be more than one tree with the same minimum or maximum weight. See :mod:`easygraph.tree.recognition` for more detailed definitions. Isolated nodes with self-loops are in the tree as edgeless isolated nodes. """ edges = list( maximum_spanning_edges(G, algorithm, weight, data=True, ignore_nan=ignore_nan) ) T = G.__class__() # Same graph class as G for i in G.nodes: T.add_node(i) for i in edges: (u, v, t) = i T.add_edge(u, v, **t) return T
def edge_boundary(G, nbunch1, nbunch2=None, data=False, default=None): """Returns the edge boundary of `nbunch1`. The *edge boundary* of a set *S* with respect to a set *T* is the set of edges (*u*, *v*) such that *u* is in *S* and *v* is in *T*. If *T* is not specified, it is assumed to be the set of all nodes not in *S*. Parameters ---------- G : EasyGraph graph nbunch1 : iterable Iterable of nodes in the graph representing the set of nodes whose edge boundary will be returned. (This is the set *S* from the definition above.) nbunch2 : iterable Iterable of nodes representing the target (or "exterior") set of nodes. (This is the set *T* from the definition above.) If not specified, this is assumed to be the set of all nodes in `G` not in `nbunch1`. data : bool or object This parameter has the same meaning as in :meth:`MultiGraph.edges`. default : object This parameter has the same meaning as in :meth:`MultiGraph.edges`. Returns ------- iterator An iterator over the edges in the boundary of `nbunch1` with respect to `nbunch2`. If `keys`, `data`, or `default` are specified and `G` is a multigraph, then edges are returned with keys and/or data, as in :meth:`MultiGraph.edges`. Notes ----- Any element of `nbunch` that is not in the graph `G` will be ignored. `nbunch1` and `nbunch2` are usually meant to be disjoint, but in the interest of speed and generality, that is not required here. """ nset1 = {v for v in G if v in nbunch1} # Here we create an iterator over edges incident to nodes in the set # `nset1`. The `Graph.edges()` method does not provide a guarantee # on the orientation of the edges, so our algorithm below must # handle the case in which exactly one orientation, either (u, v) or # (v, u), appears in this iterable. edges = G.edges(nset1, data=data, default=default) # If `nbunch2` is not provided, then it is assumed to be the set # complement of `nbunch1`. For the sake of efficiency, this is # implemented by using the `not in` operator, instead of by creating # an additional set and using the `in` operator. if nbunch2 is None: return (e for e in edges if (e[0] in nset1) ^ (e[1] in nset1)) nset2 = set(nbunch2) return ( e for e in edges if (e[0] in nset1 and e[1] in nset2) or (e[1] in nset1 and e[0] in nset2) ) """ Union-find data structure. """ class UnionFind: """Union-find data structure. Each unionFind instance X maintains a family of disjoint sets of hashable objects, supporting the following two methods: - X[item] returns a name for the set containing the given item. Each set is named by an arbitrarily-chosen one of its members; as long as the set remains unchanged it will keep the same name. If the item is not yet part of a set in X, a new singleton set is created for it. - X.union(item1, item2, ...) merges the sets containing each item into a single larger set. If any item is not yet part of a set in X, it is added to X as one of the members of the merged set. Union-find data structure. Based on Josiah Carlson's code, http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/215912 with significant additional changes by D. Eppstein. http://www.ics.uci.edu/~eppstein/PADS/UnionFind.py """ def __init__(self, elements=None): """Create a new empty union-find structure. If *elements* is an iterable, this structure will be initialized with the discrete partition on the given set of elements. """ if elements is None: elements = () self.parents = {} self.weights = {} for x in elements: self.weights[x] = 1 self.parents[x] = x def __getitem__(self, object): """Find and return the name of the set containing the object.""" # check for previously unknown object if object not in self.parents: self.parents[object] = object self.weights[object] = 1 return object # find basic of objects leading to the root path = [object] root = self.parents[object] while root != path[-1]: path.append(root) root = self.parents[root] # compress the basic and return for ancestor in path: self.parents[ancestor] = root return root def __iter__(self): """Iterate through all items ever found or unioned by this structure.""" return iter(self.parents) def to_sets(self): """Iterates over the sets stored in this structure. For example:: >>> partition = UnionFind("xyz") >>> sorted(map(sorted, partition.to_sets())) [['x'], ['y'], ['z']] >>> partition.union("x", "y") >>> sorted(map(sorted, partition.to_sets())) [['x', 'y'], ['z']] """ # Ensure fully pruned paths def groups(parents: dict): sets = {} for v, k in parents.items(): if k not in sets: sets[k] = set() sets[k].add(v) return sets for x in self.parents.keys(): _ = self[x] # Evaluated for side-effect only yield from groups(self.parents).values() def union(self, *objects): """Find the sets containing the objects and merge them all.""" # Find the heaviest root according to its weight. roots = iter(sorted({self[x] for x in objects}, key=lambda r: self.weights[r])) try: root = next(roots) except StopIteration: return for r in roots: self.weights[root] += self.weights[r] self.parents[r] = root