Source code for easygraph.functions.hypergraph.centrality.cycle_ratio

import copy
import itertools

import easygraph as eg


__all__ = [
    "my_all_shortest_paths",
    "getandJudgeSimpleCircle",
    "getSmallestCycles",
    "StatisticsAndCalculateIndicators",
    "cycle_ratio_centrality",
]


SmallestCycles = set()
CycleRatio = {}


[docs] def my_all_shortest_paths(G, source, target): pred = eg.predecessor(G, source) if target not in pred: raise eg.EasyGraphNoPath( f"Target {target} cannot be reached from given sources" ) sources = {source} seen = {target} stack = [[target, 0]] top = 0 while top >= 0: node, i = stack[top] if node in sources: yield [p for p, n in reversed(stack[: top + 1])] if len(pred[node]) > i: stack[top][1] = i + 1 next = pred[node][i] if next in seen: continue else: seen.add(next) top += 1 if top == len(stack): stack.append([next, 0]) else: stack[top][:] = [next, 0] else: seen.discard(node) top -= 1
[docs] def getandJudgeSimpleCircle(objectList): # numEdge = 0 for eleArr in list(itertools.combinations(objectList, 2)): if G.has_edge(eleArr[0], eleArr[1]): numEdge += 1 if numEdge != len(objectList): return False else: return True
[docs] def getSmallestCycles(G, NodeGirth, Coreness, DEF_IMPOSSLEN): NodeList = list(G.nodes) # print(NodeList) NodeList.sort() # setp 1 curCyc = list() for ix in NodeList[:-2]: # v1 if NodeGirth[ix] == 0: continue curCyc.append(ix) for jx in NodeList[NodeList.index(ix) + 1 : -1]: # v2 if NodeGirth[jx] == 0: continue curCyc.append(jx) if G.has_edge(ix, jx): for kx in NodeList[NodeList.index(jx) + 1 :]: # v3 if NodeGirth[kx] == 0: continue if G.has_edge(kx, ix): curCyc.append(kx) if G.has_edge(kx, jx): SmallestCycles.add(tuple(curCyc)) for i in curCyc: NodeGirth[i] = 3 curCyc.pop() curCyc.pop() curCyc.pop() # setp 2 ResiNodeList = [] # Residual Node List for nod in NodeList: if NodeGirth[nod] == DEF_IMPOSSLEN: ResiNodeList.append(nod) if len(ResiNodeList) == 0: return else: visitedNodes = dict.fromkeys(ResiNodeList, set()) for nod in ResiNodeList: if Coreness[nod] == 2 and NodeGirth[nod] < DEF_IMPOSSLEN: continue for nei in list(G.neighbors(nod)): if Coreness[nei] == 2 and NodeGirth[nei] < DEF_IMPOSSLEN: continue if not nei in visitedNodes.keys() or not nod in visitedNodes[nei]: visitedNodes[nod].add(nei) if nei not in visitedNodes.keys(): visitedNodes[nei] = set([nod]) else: visitedNodes[nei].add(nod) if Coreness[nei] == 2 and NodeGirth[nei] < DEF_IMPOSSLEN: continue G.remove_edge(nod, nei) if eg.single_source_dijkstra(G, nod, nei): for path in my_all_shortest_paths(G, nod, nei): lenPath = len(path) path.sort() SmallestCycles.add(tuple(path)) for i in path: if NodeGirth[i] > lenPath: NodeGirth[i] = lenPath G.add_edge(nod, nei) return SmallestCycles
[docs] def StatisticsAndCalculateIndicators(SmallestCyclesOfNodes, CycLenDict): # global NumSmallCycles NumSmallCycles = len(SmallestCycles) for cyc in SmallestCycles: lenCyc = len(cyc) CycLenDict[lenCyc] += 1 for nod in cyc: SmallestCyclesOfNodes[nod].add(cyc) for objNode, SmaCycs in SmallestCyclesOfNodes.items(): if len(SmaCycs) == 0: continue cycleNeighbors = set() NeiOccurTimes = {} for cyc in SmaCycs: for n in cyc: if n in NeiOccurTimes.keys(): NeiOccurTimes[n] += 1 else: NeiOccurTimes[n] = 1 cycleNeighbors = cycleNeighbors.union(cyc) cycleNeighbors.remove(objNode) del NeiOccurTimes[objNode] sum = 0 for nei in cycleNeighbors: sum += float(NeiOccurTimes[nei]) / len(SmallestCyclesOfNodes[nei]) CycleRatio[objNode] = sum + 1 return CycleRatio
[docs] def cycle_ratio_centrality(G): """ Parameters ---------- G : eg.Graph Returns ------- cycle ratio centrality of each node in G : dict Example ------- >>> G = eg.Graph() >>> G.add_edges([(1, 2), (1, 3), (1, 4), (2, 3), (2, 4), (3, 4), (1, 5), (2, 5)]) >>> cycle_ratio_centrality(G) {1: 4.083333333333333, 2: 4.083333333333333, 3: 2.6666666666666665, 4: 2.6666666666666665, 5: 1.5} """ CycLenDict = dict() NumNode = G.number_of_nodes() # update DEF_IMPOSSLEN = NumNode + 1 # Impossible simple cycle length NodeGirth = dict() CycLenDict = dict() SmallestCyclesOfNodes = {} # removeNodes = set() Coreness = dict(zip(list(G.nodes), eg.k_core(G))) for i in list(G.nodes): # SmallestCyclesOfNodes[i] = set() CycleRatio[i] = 0 if G.degree()[i] <= 1 or Coreness[i] <= 1: NodeGirth[i] = 0 removeNodes.add(i) else: NodeGirth[i] = DEF_IMPOSSLEN # print('NodeGirth:', NodeGirth) G.remove_nodes_from(removeNodes) NodeNum = G.number_of_nodes() for i in range(3, NodeNum + 2): CycLenDict[i] = 0 getSmallestCycles(G, NodeGirth, Coreness, DEF_IMPOSSLEN) cycle_ratio = StatisticsAndCalculateIndicators(SmallestCyclesOfNodes, CycLenDict) return cycle_ratio