Source code for easygraph.functions.hypergraph.centrality.hypercoreness

from itertools import compress

import easygraph as eg
import numpy as np


__all__ = ["size_independent_hypercoreness", "frequency_based_hypercoreness"]


[docs]def size_independent_hypercoreness(h): """The size_independent_hypercoreness of nodes in hypergraph. Parameters ---------- h : eg.Hypergraph. Returns ---------- dict Centrality, where keys are node IDs and values are lists of centralities. References ---------- Mancastroppa, M., Iacopini, I., Petri, G. et al. Hyper-cores promote localization and efficient seeding in higher-order processes. Nat Commun 14, 6223 (2023). https://doi.org/10.1038/s41467-023-41887-2. """ e_list = h.e[0] initial_node_num = h.num_v data = [e_list[i] for i in range(len(e_list)) if len(e_list[i]) > 1] data.sort(key=len) L = len(data) size_max = len(data[L - 1]) size = list([len(data[j]) for j in range(L)]) X = eg.Hypergraph(num_v=initial_node_num, e_list=data) IDX = list(range(1, X.num_v + 1)) M = range(2, size_max + 1) k_step = 1 K = range(1, 1200, k_step) k_shell_dict = {} idx_orig = IDX IDX_size = range(len(size)) k_max = np.zeros(len(M)) for j in idx_orig: k_shell_dict[j] = np.zeros(len(M)) for x in range(len(M)): m = M[x] D = np.zeros(len(K)) # consider only hyperedges of size >=m idx_size = list( compress(IDX_size, np.greater_equal(size, m * np.ones(len(size)))) ) int_sel = list([data[i] for i in idx_size]) # build hypergraph with only interactions of size >=m X = eg.Hypergraph(num_v=initial_node_num, e_list=int_sel) node_set = set() for sublist in int_sel: for element in sublist: node_set.add(element) IDX = list(node_set) # IDX_e = list(X.e[0]) for y in range(len(K)): kk = K[y] d_tot_m = np.zeros(len(IDX)) prev_shell = IDX for i in range(len(IDX)): d_tot_m[i] = X.degree_node[IDX[i]] idx_n_remove = list( compress(IDX, np.greater(kk * np.ones(len(d_tot_m)), d_tot_m)) ) # nodes with degree<k are removed # X.remove_nodes_from(idx_n_remove) now_e_list = X.e[0] new_e_list = [] for e in now_e_list: new_e = [] for n in e: if n not in idx_n_remove: new_e.append(n) if len(new_e) > 0: new_e_list.append(new_e) X = eg.Hypergraph(num_v=initial_node_num, e_list=new_e_list) IDX_e = list(range(0, len(X.e[0]))) sizes = [ len(X.e[0][i]) for i in IDX_e ] # hyperedges with size <m are removed idx_e_remove = [IDX_e[i] for i in range(len(IDX_e)) if sizes[i] < m] now_e_list = X.e[0] new_e_list = [] for i in range(len(now_e_list)): if i not in idx_e_remove: new_e_list.append(now_e_list[i]) X = eg.Hypergraph(num_v=initial_node_num, e_list=new_e_list) node_set = set() for sublist in X.e[0]: for element in sublist: node_set.add(element) IDX = list(node_set) while len(idx_n_remove) > 0 or len(idx_e_remove) > 0: d_tot_m = np.zeros(len(IDX)) for i in range(len(IDX)): d_tot_m[i] = X.degree_node[IDX[i]] idx_n_remove = list( compress(IDX, np.greater(kk * np.ones(len(d_tot_m)), d_tot_m)) ) # nodes with degree<k are removed # X.remove_nodes_from(idx_n_remove) now_e_list = X.e[0] new_e_list = [] for e in now_e_list: new_e = [] for n in e: if n not in idx_n_remove: new_e.append(n) if len(new_e) > 0: new_e_list.append(new_e) X = eg.Hypergraph(num_v=initial_node_num, e_list=new_e_list) IDX_e = list(range(len(X.e[0]))) sizes = [ len(X.e[0][i]) for i in IDX_e ] # hyperedges with size <m are removed idx_e_remove = [IDX_e[i] for i in range(len(IDX_e)) if sizes[i] < m] now_e_list = X.e[0] new_e_list = [] for i in range(len(now_e_list)): if i not in idx_e_remove: new_e_list.append(now_e_list[i]) X = eg.Hypergraph(num_v=initial_node_num, e_list=new_e_list) node_set = set() for sublist in X.e[0]: for element in sublist: node_set.add(element) IDX = list(node_set) shell_kk = list(sorted(set(prev_shell) - set(IDX))) for j in shell_kk: k_shell_dict[j][x] = kk - k_step node_set = set() for sublist in X.e[0]: for element in sublist: node_set.add(element) IDX = list(node_set) D[y] = len(node_set) if y > 0: if D[y] == 0 and D[y - 1] != 0: # maximum connectivity at order m k_max[x] = kk - k_step # stop the decomposition when the (k,m)-core is empty if D[y] == 0: break # size-independent hypercoreness R_dict = {} for y in k_shell_dict: R_dict[y] = sum(np.array(k_shell_dict[y]) / np.array(k_max)) return R_dict
[docs]def frequency_based_hypercoreness(h): r"""The frequency-based hypercoreness of nodes in hypergraph. Parameters ---------- h : easygraph.Hypergraph Returns ------- dict : Centrality, where keys are node IDs and values are lists of centralities. References ---------- Mancastroppa, M., Iacopini, I., Petri, G. et al. Hyper-cores promote localization and efficient seeding in higher-order processes. Nat Commun 14, 6223 (2023). https://doi.org/10.1038/s41467-023-41887-2 """ e_list = h.e[0] initial_node_num = h.num_v data = [e_list[i] for i in range(len(e_list)) if len(e_list[i]) > 1] data.sort(key=len) L = len(data) size_max = len(data[L - 1]) size = list([len(data[j]) for j in range(L)]) X = eg.Hypergraph(num_v=initial_node_num, e_list=data) IDX = list(range(1, X.num_v)) M = range(2, size_max + 1) k_step = 1 K = range(1, 1200, k_step) k_shell_dict = {} idx_orig = IDX IDX_size = range(len(size)) k_max = np.zeros(len(M)) for j in idx_orig: k_shell_dict[j] = np.zeros(len(M)) for x in range(len(M)): m = M[x] D = np.zeros(len(K)) # consider only hyperedges of size >=m idx_size = list( compress(IDX_size, np.greater_equal(size, m * np.ones(len(size)))) ) int_sel = list([data[i] for i in idx_size]) # build hypergraph with only interactions of size >=m X = eg.Hypergraph(num_v=initial_node_num, e_list=int_sel) node_set = set() for sublist in int_sel: for element in sublist: node_set.add(element) IDX = list(node_set) for y in range(len(K)): kk = K[y] d_tot_m = np.zeros(len(IDX)) prev_shell = IDX for i in range(len(IDX)): d_tot_m[i] = X.degree_node[IDX[i]] idx_n_remove = list( compress(IDX, np.greater(kk * np.ones(len(d_tot_m)), d_tot_m)) ) # nodes with degree<k are removed now_e_list = X.e[0] new_e_list = [] for e in now_e_list: new_e = [] for n in e: if n not in idx_n_remove: new_e.append(n) if len(new_e) > 0: new_e_list.append(new_e) X = eg.Hypergraph(num_v=initial_node_num, e_list=new_e_list) IDX_e = list(range(0, len(X.e[0]))) # hyperedges with size <m are removed sizes = [len(X.e[0][i]) for i in IDX_e] idx_e_remove = [IDX_e[i] for i in range(len(IDX_e)) if sizes[i] < m] now_e_list = X.e[0] new_e_list = [] for i in range(len(now_e_list)): if i not in idx_e_remove: new_e_list.append(now_e_list[i]) X = eg.Hypergraph(num_v=initial_node_num, e_list=new_e_list) node_set = set() for sublist in X.e[0]: for element in sublist: node_set.add(element) IDX = list(node_set) while len(idx_n_remove) > 0 or len(idx_e_remove) > 0: d_tot_m = np.zeros(len(IDX)) for i in range(len(IDX)): d_tot_m[i] = X.degree_node[IDX[i]] # nodes with degree<k are removed idx_n_remove = list( compress(IDX, np.greater(kk * np.ones(len(d_tot_m)), d_tot_m)) ) now_e_list = X.e[0] new_e_list = [] for e in now_e_list: new_e = [] for n in e: if n not in idx_n_remove: new_e.append(n) if len(new_e) > 0: new_e_list.append(new_e) X = eg.Hypergraph(num_v=initial_node_num, e_list=new_e_list) IDX_e = list(range(len(X.e[0]))) # hyperedges with size <m are removed sizes = [len(X.e[0][i]) for i in IDX_e] idx_e_remove = [IDX_e[i] for i in range(len(IDX_e)) if sizes[i] < m] # print("m:",m) now_e_list = X.e[0] new_e_list = [] for i in range(len(now_e_list)): if i not in idx_e_remove: new_e_list.append(now_e_list[i]) X = eg.Hypergraph(num_v=initial_node_num, e_list=new_e_list) node_set = set() for sublist in X.e[0]: for element in sublist: node_set.add(element) IDX = list(node_set) shell_kk = list(sorted(set(prev_shell) - set(IDX))) # print("fun shell_kk:",len(shell_kk)) for j in shell_kk: k_shell_dict[j][x] = kk - k_step node_set = set() for sublist in X.e[0]: for element in sublist: node_set.add(element) IDX = list(node_set) D[y] = len(node_set) if y > 0: if D[y] == 0 and D[y - 1] != 0: k_max[x] = kk - k_step # maximum connectivity at order m if D[y] == 0: break # stop the decomposition when the (k,m)-core is empty # Psi(m) distribution of hyperedges size Psi = [] for m in range(2, size_max + 1): Psi.append(size.count(m) / len(size)) # frequency-based hypercoreness R_w_dict = {} for y in k_shell_dict: R_w_dict[y] = sum(np.array(Psi) * np.array(k_shell_dict[y]) / np.array(k_max)) return R_w_dict