Source code for easygraph.functions.path.path

from easygraph.utils import *
from easygraph.utils.decorators import *


__all__ = [
    "Dijkstra",
    "Floyd",
    "Prim",
    "Kruskal",
    "Spfa",
    "single_source_bfs",
    "single_source_dijkstra",
    "multi_source_dijkstra",
]


[docs]@hybrid("cpp_spfa") def Spfa(G, node, weight="weight"): raise EasyGraphError("Please input GraphC or DiGraphC.")
[docs]@not_implemented_for("multigraph") def Dijkstra(G, node, weight="weight"): """Returns the length of paths from the certain node to remaining nodes Parameters ---------- G : graph weighted graph node : int Returns ------- result_dict : dict the length of paths from the certain node to remaining nodes Examples -------- Returns the length of paths from node 1 to remaining nodes >>> Dijkstra(G,node=1,weight="weight") """ return single_source_dijkstra(G, node, weight=weight)
[docs]@not_implemented_for("multigraph") @only_implemented_for_UnDirected_graph @hybrid("cpp_Floyd") def Floyd(G, weight="weight"): """Returns the length of paths from all nodes to remaining nodes Parameters ---------- G : graph weighted graph Returns ------- result_dict : dict the length of paths from all nodes to remaining nodes Examples -------- Returns the length of paths from all nodes to remaining nodes >>> Floyd(G,weight="weight") """ adj = G.adj.copy() result_dict = {} for i in G: result_dict[i] = {} for i in G: temp_key = adj[i].keys() for j in G: if j in temp_key: result_dict[i][j] = adj[i][j].get(weight, 1) else: result_dict[i][j] = float("inf") if i == j: result_dict[i][i] = 0 for k in G: for i in G: for j in G: temp = result_dict[i][k] + result_dict[k][j] if result_dict[i][j] > temp: result_dict[i][j] = temp return result_dict
[docs]@not_implemented_for("multigraph") @only_implemented_for_UnDirected_graph @hybrid("cpp_Prim") def Prim(G, weight="weight"): """Returns the edges that make up the minimum spanning tree Parameters ---------- G : graph weighted graph Returns ------- result_dict : dict the edges that make up the minimum spanning tree Examples -------- Returns the edges that make up the minimum spanning tree >>> Prim(G,weight="weight") """ adj = G.adj.copy() result_dict = {} for i in G: result_dict[i] = {} selected = [] candidate = [] for i in G: if not selected: selected.append(i) else: candidate.append(i) while len(candidate): start = None end = None min_weight = float("inf") for i in selected: for j in candidate: if i in G and j in G[i] and adj[i][j].get(weight, 1) < min_weight: start = i end = j min_weight = adj[i][j].get(weight, 1) if start != None and end != None: result_dict[start][end] = min_weight selected.append(end) candidate.remove(end) else: break return result_dict
[docs]@not_implemented_for("multigraph") @only_implemented_for_UnDirected_graph @hybrid("cpp_Kruskal") def Kruskal(G, weight="weight"): """Returns the edges that make up the minimum spanning tree Parameters ---------- G : graph weighted graph Returns ------- result_dict : dict the edges that make up the minimum spanning tree Examples -------- Returns the edges that make up the minimum spanning tree >>> Kruskal(G,weight="weight") """ adj = G.adj.copy() result_dict = {} edge_list = [] for i in G: result_dict[i] = {} for i in G: for j in G[i]: wt = adj[i][j].get(weight, 1) edge_list.append([i, j, wt]) edge_list.sort(key=lambda a: a[2]) group = [[i] for i in G] for edge in edge_list: for i in range(len(group)): if edge[0] in group[i]: m = i if edge[1] in group[i]: n = i if m != n: result_dict[edge[0]][edge[1]] = edge[2] group[m] = group[m] + group[n] group[n] = [] return result_dict
[docs]@not_implemented_for("multigraph") def single_source_bfs(G, source, target=None): nextlevel = {source: 0} return dict(_single_source_bfs(G.adj, nextlevel, target=target))
def _single_source_bfs(adj, firstlevel, target=None): seen = {} level = 0 nextlevel = firstlevel while nextlevel: thislevel = nextlevel nextlevel = {} for v in thislevel: if v not in seen: seen[v] = level nextlevel.update(adj[v]) yield (v, level) if v == target: break level += 1 del seen
[docs]@not_implemented_for("multigraph") def single_source_dijkstra(G, source, weight="weight", target=None): return multi_source_dijkstra(G, {source}, weight, target=target)
[docs]@not_implemented_for("multigraph") def multi_source_dijkstra(G, sources, weight="weight", target=None): return _dijkstra_multisource(G, sources, weight, target=target)
@hybrid("cpp_dijkstra_multisource") def _dijkstra_multisource(G, sources, weight="weight", target=None): from heapq import heappop from heapq import heappush push = heappush pop = heappop adj = G.adj dist = {} seen = {} from itertools import count c = count() Q = [] for source in sources: seen[source] = 0 push(Q, (0, next(c), source)) while Q: (d, _, v) = pop(Q) if v in dist: continue dist[v] = d if v == target: break for u in adj[v]: cost = adj[v][u].get(weight, 1) vu_dist = dist[v] + cost if u in dist: if vu_dist < dist[u]: raise ValueError("Contradictory paths found:", "negative weights?") elif u not in seen or vu_dist < seen[u]: seen[u] = vu_dist push(Q, (vu_dist, next(c), u)) else: continue return dist